Cassandra Tutorials

By Nadim Bahadoor | Last updated: January 1, 2020 at 17:58 pm

In this section, we will show how to connect to and use Apache Cassandra from Scala. So let's get started!

Source Code:

Introduction:

 

What is Apache Cassandra

This section is perhaps one that is very dear to me having had the opportunity to use the popular Apache Cassandra NoSQL database back in 2011 when it was freshly open-sourced by Facebook. It has been an amazing experience to see it evolve other the years, as well as working with its two commercial custom extensions, namely DataStax Enterprise (short for DSE) marketed by DataStax, and Stratio's Cassandra Lucene Index which is an open-sourced project from Stratio. For the purpose of illustration, we'll start off with the free open-sourced version of Cassandra, and then follow on with DSE. It goes without saying that DataStax has been a major contributor in pushing the platform further, and you will find a lot of useful documentation at DataStax. We will naturally demonstrate various ways of accessing Cassandra through Scala!

 

Before heading on to the installation of Apache Cassandra, it is somewhat important to go through a few essentials. Firstly, one of the great features of Apache Cassandra which led to its popularity in a short amount of time is that it can scale massively. It was designed as such from the ground up with data being partitioned across various nodes to form a cluster and/or ring. The so-called partitioning is a flavor of the Chord protocol which relies on a distributed hash table, with key-to-value IDs being shared and assigned amongst the various nodes of the cluster. As a result, and so long as you can zero-in on the specific node that holds your data points, the retrieval is then a rather fast localized operation. What is more, Apache Cassandra has full support for strong replication among various nodes within a ring, and even across other rings. That naturally becomes a fantastic asset for the product, with all of a sudden, strict uptime and disaster recovery Service Level Agreements (SLAs) were a walk in the park.

 

It is nonetheless worth mentioning that its massive scaling capabilities through the Partitioning of your data points, and its high Availability via its replication is certainly at a cost. Data within a given Cassandra cluster is eventually consistent. While Cassandra allows you to fine-tune the consistency level to meet your given domain, it is without any doubt a classic illustration of the CAP theorem by Professor Eric Brewer. In short, the CAP theorem pointed out that in the world of distributed storage systems, it it unfeasible to meet the following promises - Consistency, Availability, and Partitioning. In the case of Cassandra, it is naturally biased towards Availability and Partitioning and, consequently, concedes to Consistency.

 

Apache Cassandra has matured rapidly over the years starting with version 1.0 which I was using in 2011, to its latest version of 3.11. We will of course make use of the this latest release at the time of writing.

 

Install Apache Cassandra

We continue with the introduction to Apache Cassandra and what better way than to get started through the installation of the open-source version of Apache Cassandra. You will find a more comprehensive installation guide at the Apache Cassandra installation page. In particular, make sure that you have the appropriate JDK as well as the corresponding Python version installed on the machine where you will setup Apache Cassandra. The Python bit is a required step to leverage some of the handy built-in scripts that ship with Apache Cassandra. In this section, though, we'll head-on with the basic instructions, which obviously assumes some *nix flavor and/or operating system.

 

Firstly, you will need to download the latest version of Apache Cassandra, which at the time of this writing is 3.11 with the given tar ball name: apache-cassandra-3.11.4-bin.tar.gz. Once the download is complete, you will of course have to place the tar ball under a directory, which in my case will be /Users/nbahadoor/Documents/nad/software/cassandra/. Running an ls command or similar on the terminal should show you the compressed tar ball.


-rw-r--r--@  1 nbahadoor  staff  41235177  8 May 21:01 apache-cassandra-3.11.4-bin.tar.gz

Next, we'll have to unpack the tar ball using the familiar tar -xzvf command.


tar -xzvf apache-cassandra-3.11.4-bin.tar.gz

When the unpacking is done, you should see a new folder was created when running another ls command or similar.


-rw-r--r--@  1 nbahadoor  staff  41235177  8 May 21:01 apache-cassandra-3.11.4-bin.tar.gz
drwxr-xr-x  17 nbahadoor  staff       578  8 May 21:10 apache-cassandra-3.11.4

In a real-world enterprise setup, it is very likely that you will end up with multiple versions of Apache Cassandra as you keep up with the latest versions. Going through a rolling upgrade of a live Apache Cassandra cluster with large amount of data is in itself an interesting task. Yet, we have plan ahead with some basic steps, such as, by creating a symbolic link that will map the latest version of Apache Cassandra to some given unpacked tar ball. In the example below, we using the traditional ln -s command to create the symbolic link named latest that will point to the apache-cassandra-3.11.4 version.


ln -s apache-cassandra-3.11.4 latest

You can then navigate to the latest/bin directory which contains a number of goodies, including the noticeable start-up script. More precisely, to boot up an Apache Cassandra node in the foreground, you can use the following command:


./cassandra -f &

There are various steps, instructions and protocols that are being executed when booting up an instance of Apache Cassandra. For the purpose of our tutorials, we will only have one node that makes up the entire cluster. As a result, you should see some logging similar to the ones below which indicates that our particular Apache Cassandra node has joined the ring and node is NORMAL. That naturally implies that our Apache Cassandra node is now up and running and ready to be used ... voila!


INFO  [main] 2019-05-14 20:16:17,928 StorageService.java:1483 - JOINING: Finish joining ring
INFO  [main] 2019-05-14 20:16:18,018 StorageService.java:2327 - Node localhost/127.0.0.1 state jump to NORMAL

We will keep the tutorials on this section fairly short so that you can further experiment at your own time. For now, we will show how to stop your Cassandra node, and for that we will use the familiar kill -9 [your Cassandra PID] command. It certainly goes without saying that stopping a live Cassandra node in a production environment requires some additional steps. Since the data in our Cassandra node will be mostly transient, we can go ahead an stop the Cassandra node as follows: (1) use ps -ef | grep cassandra or similar to get the PID of your running Cassandra node and (2) call the kill -9 [your Cassandra PID] to stop the process.

 

Using nodetool

Apache Cassandra comes with a lot of goodies :) which I'm sure you will find super useful, especially if you are the one managing a real-world Cassandra cluster. For today's tutorial, we will explore a utility named nodetool and you will find it under your Cassandra's bin directory. It is commonly used to give you various stats and hints about the health of your particular Cassandra cluster, as well as having the ability to perform some mission critical actions, such as, the removal of a particular node within a ring.

 

To start with, you can use the status command from nodetool to give you a visual hint with regards to the states of your nodes. As a reminder, we are running only one node which essentially makes up the entire ring for our Cassandra cluster as per the previous setup instructions. As a result, the command ./nodetool status displays only one Cassandra node, whose status is UN - that is, Up and Normal. You will also notice the number of Tokens, which plays a key role on the sharding of a Cassandra cluster. Once again, since there is only one node, it currently takes full ownership of all the tokens. In the next tutorial, we will go over how you have flexibility in managing and/or controlling the number of tokens in your Cassandra cluster.


./nodetool status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  316.29 KiB  256          100.0%            9dadce17-559d-4f85-b5e8-ecada4b0e745  rack1

Thus far, we've been using the term cluster and ring interchangeably. While they are more or less similar, it would be great to emphasize that there can obviously be multiple nodes that form a cluster. There can also be multiple clusters that perhaps represent some real-world physical or virtual datacenters. What is more, these clusters can in turn have various rules that determine the flow of traffic from one cluster to the other through the use of something called a snitch. Even though you are not setting up and/or administering a Cassandra cluster, learning about snitches will no doubt help you better understand how your Cassandra cluster works. You can find some amazing documentation on snitches at no other than the DataStax documentation. For our simple cluster though, using the ./nodetool ring command will provide you the default allocation of the various tokens within our Cassandra cluster.


./nodetool ring

Datacenter: datacenter1
==========
Address    Rack        Status State   Load            Owns                Token                                       
                                                                          9100493479507228630                         
127.0.0.1  rack1       Up     Normal  316.29 KiB      100.00%             -9166152129757023078                        
127.0.0.1  rack1       Up     Normal  316.29 KiB      100.00%             -9154610929211871254                        
127.0.0.1  rack1       Up     Normal  316.29 KiB      100.00%             -9112266055606536440                        
127.0.0.1  rack1       Up     Normal  316.29 KiB      100.00%             -9090580030681280721                        
127.0.0.1  rack1       Up     Normal  316.29 KiB      100.00%             -8998457192844186045                        
...

Yet another handy nodetool command is ./nodetool info. As its name implies, it displays some additional health details for your Cassandra ring. You would commonly use this command to verify that, say, you've successfully disabled and/or enabled the live gossip that happens among the nodes in a Cassandra ring. And yes! the nodes in a Cassandra cluster are most certainly alive :) They are constantly talking to each other, so to speak, and this is very much the gossip protocol.


./nodetool info

ID                     : 9dadce17-559d-4f85-b5e8-ecada4b0e745
Gossip active          : true
Thrift active          : false
Native Transport active: true
Load                   : 316.29 KiB
Generation No          : 1557910129
Uptime (seconds)       : 302
Heap Memory (MB)       : 535.84 / 4016.00
Off Heap Memory (MB)   : 0.00
Data Center            : datacenter1
Rack                   : rack1
Exceptions             : 0
Key Cache              : entries 12, size 992 bytes, capacity 100 MiB, 71 hits, 94 requests, 0.755 recent hit rate, 14400 save period in seconds
Row Cache              : entries 0, size 0 bytes, capacity 0 bytes, 0 hits, 0 requests, NaN recent hit rate, 0 save period in seconds
Counter Cache          : entries 0, size 0 bytes, capacity 50 MiB, 0 hits, 0 requests, NaN recent hit rate, 7200 save period in seconds
Chunk Cache            : entries 36, size 2.25 MiB, capacity 480 MiB, 52 misses, 157 requests, 0.669 recent hit rate, 219.342 microseconds miss latency
Percent Repaired       : 100.0%
Token                  : (invoke with -T/--tokens to see all 256 tokens)

Talking of gossip, nodetool has a special command ./nodetool gossipinfo to obviously provide much more detailed stats on this matter. It would be fairly rare to make use of this particular command, unless you've recently added and/or removed a given node within a ring, and would like to make sure that the node is back to Normal and/or you'd like to spot something that may look suspicious for a node that is not behaving well.


./nodetool gossipinfo

localhost/127.0.0.1
  generation:1557910129
  heartbeat:415
  STATUS:14:NORMAL,-1061636861510287643
  LOAD:390:323886.0
  SCHEMA:10:ea63e099-37c5-3d7b-9ace-32f4c833653d
  DC:6:datacenter1
  RACK:8:rack1
  RELEASE_VERSION:4:3.11.4
  RPC_ADDRESS:3:127.0.0.1
  NET_VERSION:1:11
  HOST_ID:2:9dadce17-559d-4f85-b5e8-ecada4b0e745
  RPC_READY:16:true
  TOKENS:13:

As mentioned earlier, nodetool has a lot more commands and you can use the ./nodetool help command to display the full list.


./nodetool help

usage: nodetool [(-h  | --host )]
        [(-u  | --username )] [(-p  | --port )]
        [(-pwf  | --password-file )]
        [(-pw  | --password )]  []

The most commonly used nodetool commands are:
    assassinate                  Forcefully remove a dead node without re-replicating any data.  Use as a last resort if you cannot removenode
    bootstrap                    Monitor/manage node's bootstrap process
    cleanup                      Triggers the immediate cleanup of keys no longer belonging to a node. By default, clean all keyspaces
    clearsnapshot                Remove the snapshot with the given name from the given keyspaces. If no snapshotName is specified we will remove all snapshots
    compact                      Force a (major) compaction on one or more tables or user-defined compaction on given SSTables
    compactionhistory            Print history of compaction
    compactionstats              Print statistics on compactions
    decommission                 Decommission the *node I am connecting to*
    describecluster              Print the name, snitch, partitioner and schema version of a cluster
    describering                 Shows the token ranges info of a given keyspace
    disableautocompaction        Disable autocompaction for the given keyspace and table
    disablebackup                Disable incremental backup
    disablebinary                Disable native transport (binary protocol)
    disablegossip                Disable gossip (effectively marking the node down)
    disablehandoff               Disable storing hinted handoffs
    disablehintsfordc            Disable hints for a data center
    disablethrift                Disable thrift server
    drain                        Drain the node (stop accepting writes and flush all tables)
    enableautocompaction         Enable autocompaction for the given keyspace and table
    enablebackup                 Enable incremental backup
    enablebinary                 Reenable native transport (binary protocol)
    enablegossip                 Reenable gossip
    enablehandoff                Reenable future hints storing on the current node
    enablehintsfordc             Enable hints for a data center that was previsouly disabled
    enablethrift                 Reenable thrift server
    failuredetector              Shows the failure detector information for the cluster
    flush                        Flush one or more tables
    garbagecollect               Remove deleted data from one or more tables
    gcstats                      Print GC Statistics
    getcompactionthreshold       Print min and max compaction thresholds for a given table
    getcompactionthroughput      Print the MB/s throughput cap for compaction in the system
    getconcurrentcompactors      Get the number of concurrent compactors in the system.
    getendpoints                 Print the end points that owns the key
    getinterdcstreamthroughput   Print the Mb/s throughput cap for inter-datacenter streaming in the system
    getlogginglevels             Get the runtime logging levels
    getsstables                  Print the sstable filenames that own the key
    getstreamthroughput          Print the Mb/s throughput cap for streaming in the system
    gettimeout                   Print the timeout of the given type in ms
    gettraceprobability          Print the current trace probability value
    gossipinfo                   Shows the gossip information for the cluster
    help                         Display help information
    info                         Print node information (uptime, load, ...)
    invalidatecountercache       Invalidate the counter cache
    invalidatekeycache           Invalidate the key cache
    invalidaterowcache           Invalidate the row cache
    join                         Join the ring
    listsnapshots                Lists all the snapshots along with the size on disk and true size.
    move                         Move node on the token ring to a new token
    netstats                     Print network information on provided host (connecting node by default)
    pausehandoff                 Pause hints delivery process
    proxyhistograms              Print statistic histograms for network operations
    rangekeysample               Shows the sampled keys held across all keyspaces
    rebuild                      Rebuild data by streaming from other nodes (similarly to bootstrap)
    rebuild_index                A full rebuild of native secondary indexes for a given table
    refresh                      Load newly placed SSTables to the system without restart
    refreshsizeestimates         Refresh system.size_estimates
    reloadlocalschema            Reload local node schema from system tables
    reloadtriggers               Reload trigger classes
    relocatesstables             Relocates sstables to the correct disk
    removenode                   Show status of current node removal, force completion of pending removal or remove provided ID
    repair                       Repair one or more tables
    replaybatchlog               Kick off batchlog replay and wait for finish
    resetlocalschema             Reset node's local schema and resync
    resumehandoff                Resume hints delivery process
    ring                         Print information about the token ring
    scrub                        Scrub (rebuild sstables for) one or more tables
    setcachecapacity             Set global key, row, and counter cache capacities (in MB units)
    setcachekeystosave           Set number of keys saved by each cache for faster post-restart warmup. 0 to disable
    setcompactionthreshold       Set min and max compaction thresholds for a given table
    setcompactionthroughput      Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling
    setconcurrentcompactors      Set number of concurrent compactors in the system.
    sethintedhandoffthrottlekb   Set hinted handoff throttle in kb per second, per delivery thread.
    setinterdcstreamthroughput   Set the Mb/s throughput cap for inter-datacenter streaming in the system, or 0 to disable throttling
    setlogginglevel              Set the log level threshold for a given class. If both class and level are empty/null, it will reset to the initial configuration
    setstreamthroughput          Set the Mb/s throughput cap for streaming in the system, or 0 to disable throttling
    settimeout                   Set the specified timeout in ms, or 0 to disable timeout
    settraceprobability          Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default
    snapshot                     Take a snapshot of specified keyspaces or a snapshot of the specified table
    status                       Print cluster information (state, load, IDs, ...)
    statusbackup                 Status of incremental backup
    statusbinary                 Status of native transport (binary protocol)
    statusgossip                 Status of gossip
    statushandoff                Status of storing future hints on the current node
    statusthrift                 Status of thrift server
    stop                         Stop compaction
    stopdaemon                   Stop cassandra daemon
    tablehistograms              Print statistic histograms for a given table
    tablestats                   Print statistics on tables
    toppartitions                Sample and print the most active partitions for a given column family
    tpstats                      Print usage statistics of thread pools
    truncatehints                Truncate all hints on the local node, or truncate hints for the endpoint(s) specified.
    upgradesstables              Rewrite sstables (for the requested tables) that are not on the current version (thus upgrading them to said current version)
    verify                       Verify (check data checksum for) one or more tables
    version                      Print cassandra version
    viewbuildstatus              Show progress of a materialized view build

Moreover, each of the above-mentioned nodetool commands in turn have their own set of parameters and commands. You can zero in on those exact parameters by appending a particular nodetool command with the help command. For instance, the example below lists the additional parameters that you need when using nodetool's cfstats command by using ./nodetool help cfstats. By the way, the cfstats command is essentially short for Column Family which was the typical way for creating and/or describing tables in Cassandra. As of Cassandra 2.0 and onwards, the names for Column Family and table are used interchangeably.


./nodetool help cfstats

NAME
        nodetool cfstats - Print statistics on tables

SYNOPSIS
        nodetool [(-h  | --host )] [(-p  | --port )]
                [(-pw  | --password )]
                [(-pwf  | --password-file )]
                [(-u  | --username )] cfstats
                [(-F  | --format )] [(-H | --human-readable)] [-i] [--]
                [...]

OPTIONS
        -F , --format 
            Output format (json, yaml)

        -h , --host 
            Node hostname or ip address

        -H, --human-readable
            Display bytes in human readable form, i.e. KiB, MiB, GiB, TiB

        -i
            Ignore the list of tables and display the remaining tables

        -p , --port 
            Remote jmx agent port number

        -pw , --password 
            Remote jmx agent password

        -pwf , --password-file 
            Path to the JMX password file

        -u , --username 
            Remote jmx agent username

        --
            This option can be used to separate command-line options from the
            list of argument, (useful when arguments might be mistaken for
            command-line options

        [...]
            List of tables (or keyspace) names

 

cassandra.yaml

In the preceeding posts, we've mentioned a number of terms when it comes to using Apache Cassandra. These include, for instance, a cluster (also commonly referred to as a ring), the number of tokens within a ring and the setting up and use of a snitch. As a matter of fact, all these form part of the configuration of a particular Cassandra cluster. It would be necessary therefore that such settings be configurable according to your particular domain and/or requirements.

 

Without further ado, you would be pleased to know that these configurations can be found and modified in the cassandra.yaml file, which is located under the /conf directory of your Apache Cassandra installation. In a real-world enterprise Cassandra ring, you would typically have to version the cassandra.yaml file, and/or restrict its access, as any modifications made through this file would of course alter the behaviour of your particular Cassandra ring. To change a particular setting, you simply have to open the file with your favorite editor, such as, vi or similar, and make the necessary updates.

 

For instance, one of the very first setting you will see after opening the cassandra.yaml file is the cluster_name, which currently defaults to the "Test Cluster" literal.


cluster_name: 'Test Cluster'

Next up, you will see the num_tokens which, as its name implies, fixes the number of tokens that will be assigned for this particular Cassandra node. The default value is 256, and while it is a pretty good value to start with, your actual production value will very much depend on a number of heterogeneous factors.


num_tokens: 256

So far, we have not been asked to enter any particular credentials when connecting to our single node Cassandra ring. The reason was because the authenticator setting is by default set to allow anyone free form access to the cluster. This is obviously great when experimenting with a Cassandra cluster, but you will naturally have to modify this setting to something else, such as, the PasswordAuthenticator - this will enforce that a username and password be entered first before being able to communicate with a Cassandra cluster. We therefore use the # symbol to comment out the authenticator: AllowAuthenticator to authenticator: PasswordAuthenticator.


# authenticator: AllowAllAuthenticator
authenticator: PasswordAuthenticator

Another important setting is the partitioner which is essentially responsible for the sharding heuristics of your Cassandra cluster. The default partitioner has certainly evolved over the years, and at the time of writing, it is the Murmur3Partitioner. Similar to the number of tokens, the actual partitioner of choice for your real-world Cassandra cluster will very much depend on numerous heterogenous factors, and including the shape of your underlying data points. Using the default Murmur3Partitioner is of course a good place to start with.


partitioner: org.apache.cassandra.dht.Murmur3Partitioner

Our single node Cassandra cluster is without question a toy installation at the moment. In a large cluster though you will normally designate a few nodes as being seed nodes. Any node either, joining or leaving, the cluster would then communicate with the seed nodes with regards to their joining or leaving protocols as opposed to fanning out a bunch of messages across the entire ring.


seed_provider: 

The listen_address is also a common setting that you will modify to either a physical or logical address for your given infrastructure.


listen_address

Selecting and configuring Cassandra snitches will undoubtedly take more space than we have here. But, Apache Cassandra comes with a number of handy ones, namely, SimpleSnitch, GossipingPropertyFileSnitch, PropertyFileSnitch, Ec2Snitch, Ec2MultiRegionSnitch, RackInferringSnitch. What is more, if you are using DataStax Enterprise (DSE), you will benefit from additional snitches specific to the DSE platform.


endpoint_snitch: SimpleSnitch

The cassandra.yaml has other settings that you would most certainly have to fine-tune for a real-world Cassandra cluster.

 

cqlsh

We are now ready to experiment a bit further with our Apache Cassandra single node cluster. In this section, we'll illustrate the use of a handy utility named cqlsh.sh and it can be found under the /bin directory of your Cassandra installation. You will however need to provide a username and a password before gaining access to the cqlsh, which is also referred to as the CQL Shell. The reason for the login credentials is because we've enabled authenticator: PasswordAuthenticator in the cassandra.yaml configuration file. You can use the default cassandra username as well as the cassandra password to login, respectively. It nonetheless goes without saying that in a real-world enterprise Cassandra installation, you would most certainly have a different username and password, and/or even a different authentication and authorization mechanic.


./cqlsh localhost -u cassandra -p cassandra
Connected to Test Cluster at localhost:9042.
[cqlsh 5.0.1 | Cassandra 3.11.4 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help

Once logged in, you will notice a welcome message similar to the one above, followed by a command-line prompt. Within this shell, you can naturally execute CQL statements and to begin with, we'll display the help screen by running the help; command. Notice also that you will be required to terminate each statement by a semi-colon ;.


cassandra@cqlsh> help;

Documented shell commands:
===========================
CAPTURE  CLS          COPY  DESCRIBE  EXPAND  LOGIN   SERIAL  SOURCE   UNICODE
CLEAR    CONSISTENCY  DESC  EXIT      HELP    PAGING  SHOW    TRACING

CQL help topics:
================
AGGREGATES               CREATE_KEYSPACE           DROP_TRIGGER      TEXT     
ALTER_KEYSPACE           CREATE_MATERIALIZED_VIEW  DROP_TYPE         TIME     
ALTER_MATERIALIZED_VIEW  CREATE_ROLE               DROP_USER         TIMESTAMP
ALTER_TABLE              CREATE_TABLE              FUNCTIONS         TRUNCATE 
ALTER_TYPE               CREATE_TRIGGER            GRANT             TYPES    
ALTER_USER               CREATE_TYPE               INSERT            UPDATE   
APPLY                    CREATE_USER               INSERT_JSON       USE      
ASCII                    DATE                      INT               UUID     
BATCH                    DELETE                    JSON            
BEGIN                    DROP_AGGREGATE            KEYWORDS        
BLOB                     DROP_COLUMNFAMILY         LIST_PERMISSIONS
BOOLEAN                  DROP_FUNCTION             LIST_ROLES      
COUNTER                  DROP_INDEX                LIST_USERS      
CREATE_AGGREGATE         DROP_KEYSPACE             PERMISSIONS     
CREATE_COLUMNFAMILY      DROP_MATERIALIZED_VIEW    REVOKE          
CREATE_FUNCTION          DROP_ROLE                 SELECT          
CREATE_INDEX             DROP_TABLE                SELECT_JSON     

cassandra@cqlsh>

Next, we can pull up the default users that are currently setup in our Cassandra installation by using the list users; command. We obviously would expect only one user named cassandra which we used to login.


cassandra@cqlsh> list users;
INFO  [Native-Transport-Requests-1] 2019-05-20 20:36:51,891 AuthCache.java:161 - (Re)initializing RolesCache (validity period/update interval/max entries) (2000/2000/1000)

 name      | super
-----------+-------
 cassandra |  True

cassandra@cqlsh>

The above looks and feel like a normal table that you will find in traditional database systems. In other words, there is a column named name and another one named super. These have the values cassandra and super, accordingly. Yet, we should recall that a typical Cassandra setup runs as a cluster, and that the rows of tables are essentially scattered across the various nodes of the cluster. We will go over table structures in much more details later on.

 

Speaking of tables, these typically reside within some schema in a relational database system. Similarly, tables in Cassandra can be grouped within a given keyspace. A keyspace forms an important part of your Cassandra cluster as it drives, for instance, the replication strategy. For now, we'll go ahead and create a simple keyspace with the SimpleStrategy and replication_factor of 1 since we only have one Cassandra node running.


create keyspace donutstore with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

cassandra@cqlsh>

The above will create a keyspace named donutstore which will host the tables and other structures for the purpose of our Cassandra tutorials. You can further explore the built-in system keyspaces with the describe keyspaces; command.


cassandra@cqlsh:donutstore> describe keyspaces;

system_schema  system      system_distributed
system_auth    donutstore  system_traces     

To navigate to one of the above keyspaces, such as our donutstore keyspace, you will have to run the use donutstore; command. You will notice the cqlsh prompt changes to: cassandra@cqlsh:donutstore>.


cassandra@cqlsh> use donutstore;
cassandra@cqlsh:donutstore> 

Within the donutstore keyspace, you can run the describe tables; command to view the tables defined within that particular keyspace. Obviously, there should be none as we have not created any yet!


cassandra@cqlsh:donutstore> describe tables;



cassandra@cqlsh:donutstore>

We can therefore go ahead and create a basic table named simple_donut which has the following columns: name, price, ingredients, and there the primary key is the name column for this illustration. We will cover keys in more details in upcoming tutorials as they form an integral part of how your data is stored and retrieved across your given Cassandra cluster.


cassandra@cqlsh:donutstore> create table simple_donut(name text, price decimal, ingredients set <text>, primary key(name));

cassandra@cqlsh:donutstore>

With the simple_donut table created, you can use the describe simple_donut command to view additional details that are associated with the table. If you have never used Cassandra before, you might feel overwhelmed by the additional details associated with a given table, from caching, compression to bloom filters. We will cover these as we go along with our Cassandra tutorial.


cassandra@cqlsh:donutstore> describe simple_donut;

CREATE TABLE donutstore.simple_donut (
 name text PRIMARY KEY,
 ingredients set <text>,
 price decimal
) WITH bloom_filter_fp_chance = 0.01
 AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
 AND comment = ''
 AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
 AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
 AND crc_check_chance = 1.0
 AND dclocal_read_repair_chance = 0.1
 AND default_time_to_live = 0
 AND gc_grace_seconds = 864000
 AND max_index_interval = 2048
 AND memtable_flush_period_in_ms = 0
 AND min_index_interval = 128
 AND read_repair_chance = 0.0
 AND speculative_retry = '99PERCENTILE';
cassandra@cqlsh:donutstore>

Let us add a row to the simple_donut table with the following CQL command: insert into simple_donut(name, price, ingredients) values ('plain donut', 1.50, {'sugar', 'flour'});.
While experimenting with the simple_donut table, it is totally OK to run a "select name, price, ingredients from simple_donut;" command to view the rows accordingly. In a real-world enterprise Cassandra cluster, though, you would most certainly have to restrict such a query by some predicate as we will see in the upcoming tutorials.


cassandra@cqlsh:donutstore> 

 name        | ingredients        | price
-------------+--------------------+-------
 plain donut | {'flour', 'sugar'} |  1.50

(1 rows)
cassandra@cqlsh:donutstore>

This should be a good start to show you how to quit the CQL shell by running the quit; command.


cassandra@cqlsh:donutstore> quit;

 

Load CQL statements from file

In the preceding section, we showed how you can create a particular keyspace named donutstore, along with adding a table named simple_donut. Typically, in a real-world enterprise Cassandra cluster, you should not only version your CQL statements, but also somehow have a way to reload (so to speak) the structures that make up your particular domain. For instance, you may need to spin up a fresh Cassandra node and therefore have to walk-through some initialization steps to add, say, your keyspace and its relevant tables.

 

To this end, the cqlsh which we previously discussed is not only a command-line utility, but it can also be used to load a file that contains CQL statements. Let us consider creating a file named donutstore.cql, and it will contain the CREATE KEYSPACE ... and the CREATE TABLE ... statements for the donutstore keyspace and the simple_donut table, respectively. You will have to end each CQL statement with a semi-colon ; as shown below.


CREATE KEYSPACE IF NOT EXISTS donutstore WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE IF NOT EXISTS donutstore.simple_donut(name text, price decimal, ingredients set<text>, primary key(name));

Notice also that we've used the IF NOT EXISTS keywords to of course not run the above-mentioned CQL statements should there already be a donutstore keyspace and/or a simple_donut table. As for the simple_donut table, you would have observed that we've additionally prefixed the table name explicitly with donutstore., such that the table can be created within the donutstore keyspace. As a matter of fact, should you omit the keyspace, you may run into the following Cassandra error when executing your CQL statement: No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename.

 

For the purpose of illustration, I've saved the donutstore.cql file somewhere on my file system at /Users/nbahadoor/Documents/nad/scripts/donutstore.cql. You are obviously free to save and/or version the CQL file under your desired directory. Next, you will have to navigate back to your Cassandra installation, and more precisely under its bin directory where you will be able to run the cqlsh command. Unlike the previous code snippet on using the cqlsh interactively, we'll instead pass-through a -f flag followed by the location of our CQL file. As a reminder though, you'll also have to pass-through the login credentials for your Cassandra installation since we've enabled authenticator: PasswordAuthenticator in the cassandra.yaml file.


./cqlsh -f /Users/nbahadoor/Documents/nad/scripts/donutstore.cql -u cassandra -p cassandra localhost

The above-mentioned will execute every CQL statement from the donutstore.cql file. This technique is especially useful when you are experimenting with the structures that will make up your particular domain in a given Apache Cassandra cluster.

 

Partition Keys

In the preceding simple_donut Cassandra table, you would have observed that we've used the name column as our primary key. If you are familiar with any relational database and have used SQL in the past, adding a primary key to a given table would of course feel like a natural thing to do. We should nonetheless always remind ourselves that Cassandra works as a cluster or ring. Consequently, our data points can and will be scattered across the nodes that make up our ring. As a matter of fact, Cassandra is well-known for being able to store large amount of data, and one of the key reasons for its ability to scale massively is the fact that you can easily add new nodes to your cluster.

 

Explaining the rebalancing process of a live Cassandra cluster with nodes joining or being removed from the ring would take more space that we have here, but there are few important points relevant to our discussion on keys. The primary key is essentially the partition key for our data points and it is used to locate the nodes within the ring that hold the data. With Cassandra therefore you commonly tend to design your tables or data models according to the queries that you'd like to satisfy. In other words, you would want to avoid the shuffling of data across a Cassandra cluster, and instead group certain data points to be stored together.

 

Let us assume that our donut products are being traded over some exchange, and as such a given donut would be priced multiple times during the day. We'll go ahead and create a new table named donut_price_by_day with the following columns: a name, a price date, a price timestamp and a price. You will have to login to the cqlsh as shown above and navigate to the donutstore keyspace that we've previously created.


create table donutstore.donut_price_by_day (
	name text, 
	price_date text, 
	price_ts timestamp, 
	price double,
	primary key ((name, price_date), price_ts)
);

Notice however that the primary key is defined by (name, price_date), and followed by price_ts. The first part of the primary key, that is (name, price_date), is commonly referred to as composite partition key. In the above example, we are obviously grouping or breaking up our data points into the price for a particular day. You could obviously group your data points by, say, month or year, and how you model the grouping will be driven by the queries that you'd like to satisfy for your particular domain. The second part of the primary key definition, that is the price_ts, is known as the clustering key, and its role is to sort the data points - by default, in ascending order.

 

Next, we craft some insert statements to add a few data points to the donut_price_by_day table. For the price_ts column, which requires a given timestamp, we'll use the handy toTimeStamp(now()) functions that is built-in with Cassandra.


insert into donut_price_by_day (name, price_date, price_ts, price) values ('plain donut', '2019-05-01', toTimeStamp(now()), 1.50);
insert into donut_price_by_day (name, price_date, price_ts, price) values ('plain donut', '2019-05-01', toTimeStamp(now()), 1.51);
insert into donut_price_by_day (name, price_date, price_ts, price) values ('plain donut', '2019-05-01', toTimeStamp(now()), 1.50);

After running the above insert statements into cqlsh, you will of course be able to query your data. In particular, we will execute the following CQL query: select name, price_date, price_ts, price from donut_price_by_day where name = 'plain donut' and price_date = '2019-05-01';. As might be expected, you should see the following output:


cassandra@cqlsh:donutstore> select name, price_date, price_ts, price from donut_price_by_day where name = 'plain donut' and price_date = '2019-05-01';

 name        | price_date | price_ts                        | price
-------------+------------+---------------------------------+-------
 plain donut | 2019-05-01 | 2019-05-26 16:57:24.686000+0000 |   1.5
 plain donut | 2019-05-01 | 2019-05-26 16:57:30.053000+0000 |  1.51
 plain donut | 2019-05-01 | 2019-05-26 16:57:33.925000+0000 |   1.5

Yet, we should take a pause and reflect for a minute on what just happened here. You could imagine running a large Cassandra cluster of, say, 100+ nodes. Searching for the donut prices of the 'plain donut' product on a particular date of '2019-05-01' will be a pretty fast operation anyway - the query will precisely zero-in on the node that holds the data points.

 

Clustering keys

In this section, we take a closer look at the clustering key concept which we previously introduced. But first, let us consider the basic query of finding a particular donut by its name from the table simple_donut:


select name, ingredients, price from simple_donut where name = 'vanilla donut'

The name column was defined as the partition key which, as its name implies, will essentially turn the above-mentioned query into a particular hash that will precisely zero-in on the Cassandra node that holds the data. We will not dig deeper into the so-called hash at the moment as we'll come back to it and cover Cassandra tokens simultaneously. Nonetheless, it is fair to say that the query will be pretty efficient.

 

What if we wanted to search for all donuts that had the literal 'vanilla' as part of the sets from the ingredients column? Working through this query with our relational hat on would naturally lead us to craft the query below. Notice, however, that when running the query, cqlsh will kindly remind us that we are big data land, so to speak! In a real-world enterprise Cassandra setting, you are most certainly going to be dealing with multiple nodes that make up your Cassandra cluster or ring, and therefore such a query will be inefficient. In other words, a full scan will be required as we don't know which partition and hence which Cassandra node owns the data points.


cassandra@cqlsh:donutstore> select name, ingredients, price from simple_donut where ingredients contains 'vanilla';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
cassandra@cqlsh:donutstore> 

You would of course not want to execute such a query in a production setting and with a large data set. Nevertheless, if you are just toying with your data points on a samll data set in a non-production environment, such as, development, test or similar, you can of course use the allow filtering keywords accordingly.


cassandra@cqlsh:donutstore> select name, ingredients, price from simple_donut where ingredients contains 'vanilla' allow filtering;

 name          | ingredients                   | price
---------------+-------------------------------+-------
 vanilla donut | {'flour', 'sugar', 'vanilla'} |     2

(1 rows)
cassandra@cqlsh:donutstore> 

You may be asking yourself what does the above have to do with clustering keys. Using the describe table keywords, we can review the definition for the donut_price_by_day table. As a reminder, the price_ts column is indeed a clustering key, and you will also observe that Cassandra explicitly adds the ascending order: WITH CLUSTERING ORDER BY (price_ts ASC).


cassandra@cqlsh:donutstore> describe table donut_price_by_day ;

CREATE TABLE donutstore.donut_price_by_day (
    name text,
    price_date text,
    price_ts timestamp,
    price double,
    PRIMARY KEY ((name, price_date), price_ts)
) WITH CLUSTERING ORDER BY (price_ts ASC)

With the clustering price_ts column in place, we benefit from additional querying capabilities, such as, range lookups. For instance, the query below finds the prices of all the plain donuts for the price_date of '2019-05-01', and where the price_ts is within the following range: price_ts >= '2019-05-26 16:57:24' and price_ts <= '2019-05-30 16:57:24'.


cassandra@cqlsh:donutstore> select price from donut_price_by_day where name = 'plain donut' and price_date = '2019-05-01' and price_ts >= '2019-05-26 16:57:24' and price_ts <= '2019-05-30 16:57:24'; price ------- 1.5 1.51 1.5 (3 rows) cassandra@cqlsh:donutstore>

You would certainly note that there were no warnings when executing the above Cassandra range query. The catch, however, remains that we've provided the relevant partition keys that help to zoom in on the Cassandra nodes that hold the data. The subsequent range lookup, or filtering so to speak, is then also a fast operation. For the purpose of illustration, let us omit the partition keys and run the query.


cassandra@cqlsh:donutstore> select price from donut_price_by_day where price_ts >= '2019-05-26 16:57:24' and price_ts <= '2019-05-30 16:57:24';                      
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"

You will be presented with the similar ALLOW FILTERING warning to alert you of the full scan being performed. So far, the general theme that you might have spotted is that in Cassandra, you have to design your data model by having your top queries in mind. Clustering keys are just one of those features that can be leveraged to get a free index to some extent. We will cover indexing and other features such as materialized views shortly.

 

Cassandra CQL is not SQL!

Thus far, you would have noticed that Apache Cassandra's CQL does look and feel like the traditional SQL for querying relational databases. Yet, you should always keep at the back of your mind that Cassandra is a live cluster, with nodes constantly talking to each other (so to speak) using the underlying Gossip Protocol. Besides, with the numerous fine-grained configurations that can change its distributed systems mechanics, you would be quick to criticise the platform should your queries not perform well.

 

Benchmarking Cassandra queries is an interesting topic of discussion, and there are certain steps and techniques that you can use if you are using the free open-source version. It goes without saying that with the DSE platform from the brilliant engineers at DataStax, you will be provided with additional benchmarking tools, as you'd expect from a well established vendor. For now though we'll just have to try and not replicate our traditional SQL techniques and apply them to CQL queries.

 

For the purpose of this illustration, we'll revisit the basic query from the previous code snippets - that of finding a particular donut item from the simple_donut table by pushing down a predicate to the name column (that was our partition key). Running the select name, ingredients, price from simple_donut where name = 'plain donut'; query in cqlsh will of course return the corresponding row.


cassandra@cqlsh:donutstore> select name, ingredients, price from simple_donut where name = 'plain donut';

 name        | ingredients        | price
-------------+--------------------+-------
 plain donut | {'flour', 'sugar'} |  1.50

(1 rows)
cassandra@cqlsh:donutstore> 

What if we were to mirror the above-mentioned query using an IN clause, such as, select name, ingredients, price from simple_donut where name in ('plain donut');? Sure enough, our query will return the identical row similar to the one above.


cassandra@cqlsh:donutstore> select name, ingredients, price from simple_donut where name in ('plain donut');

 name        | ingredients        | price
-------------+--------------------+-------
 plain donut | {'flour', 'sugar'} |  1.50

(1 rows)
cassandra@cqlsh:donutstore> 

OK, you could argue that an IN clause is perhaps best suited for multiple values at the predicate level. Something like finding multiple donuts by their names: select name, ingredients, price from simple_donut where name in ('plain donut', 'strawberry donut');.


cassandra@cqlsh:donutstore> select name, ingredients, price from simple_donut where name in ('plain donut', 'strawberry donut');

 name             | ingredients                      | price
------------------+----------------------------------+-------
      plain donut |               {'flour', 'sugar'} |  1.50
 strawberry donut | {'flour', 'strawberry', 'sugar'} |  2.50

(2 rows)
cassandra@cqlsh:donutstore> 

And indeed the appropriate rows are returned when executing the above query. And yes, CQL has greatly improved the user experience when it comes to querying Cassandra compared to the early days of using Thrift. The above query is actually a disaster in the making in a real-world enterprise large Cassandra cluster. Of course the query worked well for two values, but what if you had numerous values, such as, where name in ('name-1', 'name-2', ...., 'name-1000') or more?

 

Before we can understand the above implications, we would first need to review how Cassandra queries are executed. Your application will use a Cassandra driver that will connect to the given Cassandra cluster. A query will be sent to what is known as a coordinator node that will be responsible for executing it. Thereafter, the coordinator node will typically source the actual data points from another replica and as per your consistency and/or quorum configurations. So our basic in clause query can potentially involve numerous round trips at the storage level before the coordinator can return the results, and that can be problematic for various reasons - the coordinator can fail, run out of memory etc. The team at DataStax presents a very nice diagram on the read requests of coordinator nodes that is certainly worth going through.

 

It goes without saying that DataStax provides more in-depth guides when it comes to Cassandra, whereas we will provide the basic introduction to Cassandra to help us write corresponding Scala code. For the IN clause scenario, a better option from our Scala code would be to map and/or sequence a bunch of futures that would break up the IN clause, and then push the queries to the storage level of Cassandra. And yes, with a well-balanced Cassandra cluster, you should not be scared at sending multiple read requests!

 

Multiple clustering keys

We continue our journey in getting familiar with Cassandra's data modeling, and hence create a new table named yearly_donuts_by_user in the donutstore keyspace. As its name implies, this table will track the donuts bought by our users from various stores. Consequently, we will partition the table by the (userid, purchase_year) columns such that the data representation at the storage level will be co-located within our Cassandra cluster by the userid and purchase_year bucket, accordingly. While such a bucketing criteria seems fairly reasonable, in a real-world enterprise Cassandra cluster, you would have to consider other factors, such as, the frequency and the variances of your data points, among other things, in order to avoid hotspots within your Cassandra cluster.


create table donutstore.yearly_donuts_by_user (
	userid text, 
	purchase_year text,
	donut_name text,
	donut_price double,
	store_id text,
	purchase_dt text,
	primary key ((userid, purchase_year), donut_name, donut_price, purchase_dt)
);

Next, login to cqlsh so that you can run the above CQL create table statement. You would have keenly observed that unlike the previous discussion on clustering keys, we've now introduced multiple clustering keys, and they are: donut_name, donut_price, purchase_dt. At first sight, these should allow us to run additional filtering capabilities on our data points - but, we'll get to that shortly!

With the yearly_donuts_by_user created, we can then create some data with the following CQL insert statements.


insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('john_123', '2018', 'plain donut', 1.50, '111', '2018-05-08');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('tony_555', '2017', 'vanilla donut', 2, '222', '2017-01-10');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('sarah_666', '2018', 'plain donut', 1.55, '111', '2018-06-06');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('sarah_666', '2018', 'chocolate donut', 2.25, '222', '2018-07-07');

Since we are merely running a single Cassandra node as our Cassandra ring, in addition to only having a few dummy data points, let's run a select all... query. It goes without saying that you should not run similar queries in your production Cassandra clusters.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user;

 userid    | purchase_year | donut_name      | donut_price | purchase_dt | store_id
-----------+---------------+-----------------+-------------+-------------+----------
 sarah_666 |          2018 | chocolate donut |        2.25 |  2018-07-07 |      222
 sarah_666 |          2018 |     plain donut |        1.55 |  2018-06-06 |      111
  tony_555 |          2017 |   vanilla donut |           2 |  2017-01-10 |      222
  john_123 |          2018 |     plain donut |         1.5 |  2018-05-08 |      111

(4 rows)
cassandra@cqlsh:donutstore> 

That above worked great, but now we'd like to find all the donuts that were bought by the user sarah_666. Naturally, the table is not modeled to answer this particular query and, as a reminder, in Cassandra we typically model by query (so to speak). Therefore, you should see an error similar to the one below.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
cassandra@cqlsh:donutstore> 

What we need instead is to query the yearly_donuts_by_user table by providing both the userid and the purchase_year predicates as mandated by the table's schema. Of course, running such a query will work just fine, and indeed outputs the donuts bought by the user sarah_666 during the year 2018.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018';

 userid    | purchase_year | donut_name      | donut_price | purchase_dt | store_id
-----------+---------------+-----------------+-------------+-------------+----------
 sarah_666 |          2018 | chocolate donut |        2.25 |  2018-07-07 |      222
 sarah_666 |          2018 |     plain donut |        1.55 |  2018-06-06 |      111

(2 rows)
cassandra@cqlsh:donutstore> 

So what about those multiple clustering keys that we've added in the schema definition to the table yearly_donuts_by_user? Let's give another query a try! For instance, let's filter for all the donuts which were purchased on and after the '2018-07-05' date using the query: select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05';. Oops, that did not go well, and you should see some error message as the one below.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05';
InvalidRequest: Error from server: code=2200 [Invalid query] message="PRIMARY KEY column "purchase_dt" cannot be restricted as preceding column "donut_name" is not restricted"
cassandra@cqlsh:donutstore> 

As it turns out, we had defined our multiple clustering keys in the table schema as follows: donut_name, donut_price, purchase_dt. Something new if you are coming from a purely relational database background is that the order of the clustering keys do matter! You cannot filter by the purchase_dt column without passing-through the donut_name and the donut_price values, respectively.

 

With model by query in mind, let's drop table yearly_donuts_by_user, and then create another one. But, this time around, we'll swap the order of the multiple clustering keys to be as: purchase_dt, donut_name, donut_price.


create table donutstore.yearly_donuts_by_user (
	userid text, 
	purchase_year text,
	donut_name text,
	donut_price double,
	store_id text,
	purchase_dt text,
	primary key ((userid, purchase_year), purchase_dt, donut_name, donut_price)
);

We naturally re-insert our dummy data points using the CQL insert statements below.


insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('john_123', '2018', 'plain donut', 1.50, '111', '2018-05-08');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('tony_555', '2017', 'vanilla donut', 2, '222', '2017-01-10');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('sarah_666', '2018', 'plain donut', 1.55, '111', '2018-06-06');

insert into yearly_donuts_by_user (userid, purchase_year, donut_name, donut_price, store_id, purchase_dt) values ('sarah_666', '2018', 'chocolate donut', 2.25, '222', '2018-07-07');

With the new table in place, you should therefore be able to find all the donuts bought by the user sarah_666 in 2018, and filter for purchase_dt >='2018-07-05'.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05';

 userid    | purchase_year | donut_name      | donut_price | purchase_dt | store_id
-----------+---------------+-----------------+-------------+-------------+----------
 sarah_666 |          2018 | chocolate donut |        2.25 |  2018-07-07 |      222

(1 rows)
cassandra@cqlsh:donutstore> 

I hope that so far you are starting to observe that with Apache Cassandra, you cannot simply write some awesome Scala code that will execute your queries versus a particular Cassandra cluster - you need to have some basic understanding of Cassandra in general, as well as some of the key points surrounding Cassandra's data modeling.

 

Clustering keys restrictions

Previously, we illustrated the importance on the order of clustering keys as part of your Cassandra table schema definitions. Let us review the last query which we ran from the preceding section, which was to find all the donuts bought by user sarah_666 in 2018, and filtering for all orders that were purchased on and after 2018-07-07.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05';

 userid    | purchase_year | donut_name      | donut_price | purchase_dt | store_id
-----------+---------------+-----------------+-------------+-------------+----------
 sarah_666 |          2018 | chocolate donut |        2.25 |  2018-07-07 |      222

(1 rows)
cassandra@cqlsh:donutstore>

What if we'd also like to add one more filter, that of looking for only 'plain donut' items? It would naturally make common sense to append the following CQL predicates - and donut_name = 'plain donut'. That is unfortunately yet another example of leaning on previous use of SQL from relational database systems. Notice however that running such a CQL query will in fact trigger an error: InvalidRequest: Error from server: code=2200 [Invalid query] message="Clustering column "donut_name" cannot be restricted (preceding column "purchase_dt" is restricted by a non-EQ relation)".


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05' and donut_name='plain donut';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Clustering column "donut_name" cannot be restricted (preceding column "purchase_dt" is restricted by a non-EQ relation)"
cassandra@cqlsh:donutstore> 

Of course, one could wonder - so what's going on here? OK, let's perhaps give another query a try - that of filtering for and donut_price > 2.0.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt >='2018-07-05' and donut_price > 2.0;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Clustering column "donut_price" cannot be restricted (preceding column "purchase_dt" is restricted by a non-EQ relation)"
cassandra@cqlsh:donutstore>

To our surprise, we get a similar error! While the use of clustering keys can no doubt provide you with additional and efficient filtering capabilities, you should nonetheless be aware that the use of a range filter is restricted to the last clustering column. For instance, consider the query below where the range slice has been moved to the last clustering column named donut_price.


cassandra@cqlsh:donutstore> select userid, purchase_year, donut_name, donut_price, purchase_dt, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018' and purchase_dt ='2018-07-07' and donut_name = 'chocolate donut' and donut_price >= 2.0;

 userid    | purchase_year | donut_name      | donut_price | purchase_dt | store_id
-----------+---------------+-----------------+-------------+-------------+----------
 sarah_666 |          2018 | chocolate donut |        2.25 |  2018-07-07 |      222

(1 rows)
cassandra@cqlsh:donutstore> 

The query does work like a charm, but it may (or may not be) the query we'd like to have run. So we again go back to the idea that with Cassandra, you typically design by query when it comes to mapping of your domains to the underlying Cassandra data model.

 

Secondary index

By now, you must have a fairly good understanding of Cassandra's partition keys as well as its clustering keys. But, perhaps with our relational database hat on, you could wonder if Cassandra supports any indexing capabilities. Well, the first one that is typically mentioned is called a secondary index.

 

If you recall from the yearly_donuts_by_user table, it had a composite partition key that was made up of two columns; the userid and the purchase_year column, respectively. Therefore, we were able to find rows pretty efficiently that met the above-mentioned predicates in a CQL query. What if we wanted to find all the rows from, say, just the userid column? It certainly seems like a sensible query - so let's give it a try in cqlsh.


cassandra@cqlsh:donutstore> select userid, purchase_year, purchase_dt, donut_name, donut_price, store_id from yearly_donuts_by_user where userid = 'sarah_666';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
cassandra@cqlsh:donutstore> 

Notice however that we get an error message warning us of potential unpredictable performance. Similar errors or warnings from the relational database world is somewhat resolved by, creating an index, (so to speak). To create a Cassandra secondary index on the userid column, you can run the following create index... statement in cqlsh.


cassandra@cqlsh:donutstore> create index userid_idx on donutstore.yearly_donuts_by_user (userid);

If you were to inspect your Cassandra's logs, you would observe messages similar to the ones below that outlines the secondary index creation steps.


INFO  [MigrationStage:1] 2019-06-06 21:04:26,071 ColumnFamilyStore.java:430 - Initializing donutstore.yearly_donuts_by_user.userid_idx
INFO  [SecondaryIndexManagement:3] 2019-06-06 21:04:26,078 CassandraIndex.java:707 - Submitting index build of userid_idx for data in BigTableReader(path='/Users/nbahadoor/Documents/nad/software/cassandra/apache-cassandra-3.11.4/data/data/donutstore/yearly_donuts_by_user-18f2d8b0855a11e98b7b8b893e6f97c9/md-1-big-Data.db')
cassandra@cqlsh:donutstore> INFO  [SecondaryIndexManagement:3] 2019-06-06 21:04:26,102 CassandraIndex.java:719 - Index build of userid_idx complete

To make sure that the secondary index was indeed created, you can further run the describe table ... commands from cqlsh. You will thereafter observe that the table yearly_donuts_by_user does in fact contains the CREATE INDEX ... definitions as shown below.


cassandra@cqlsh:donutstore> describe table yearly_donuts_by_user ;

CREATE TABLE donutstore.yearly_donuts_by_user (
    userid text,
    purchase_year text,
    purchase_dt text,
    donut_name text,
    donut_price double,
    store_id text,
    PRIMARY KEY ((userid, purchase_year), purchase_dt, donut_name, donut_price)
) WITH CLUSTERING ORDER BY (purchase_dt ASC, donut_name ASC, donut_price ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX userid_idx ON donutstore.yearly_donuts_by_user (userid);

With the secondary index on the userid column in place, let us re-run the query of filtering rows by the userid column only. We no longer have any associated warnings as per the initial query prior to the secondary index.


cassandra@cqlsh:donutstore> select userid, purchase_year, purchase_dt, donut_name, donut_price, store_id from yearly_donuts_by_user where userid = 'sarah_666';

 userid    | purchase_year | purchase_dt | donut_name      | donut_price | store_id
-----------+---------------+-------------+-----------------+-------------+----------
 sarah_666 |          2018 |  2018-06-06 |     plain donut |        1.55 |      111
 sarah_666 |          2018 |  2018-07-07 | chocolate donut |        2.25 |      222

(2 rows)
cassandra@cqlsh:donutstore> 

If we pause for a second, however, it is once again very important to remind us that Cassandra works as a cluster or ring. That in itself should make us have second thoughts with respect to the efficiency of running queries versus a secondary index. As a matter of fact, there are many factors to consider before creating and maintaining a successful secondary index in a large Cassandra cluster. These would be along the lines of the shape of your data, the variances and cardinalities and the incremental updates to the data points of the column to be indexed, among others. It is certainly not a trivial exercise, and the DataStax team provides a general guideline on the use of secondary indexes, which should be an absolute read if you consider using them.

 

It is worth mentioning that with the DSE platform from DataStax, you get additional tooling capabilities around indexing of your data points! Nonetheless, if you are just working with toy datasets that are relatively small in a non-production environment, using a secondary index can become very handy.

 

Create materialized view

Adding a bunch of indices in order to satisfy certain query paths over Cassandra tables is no doubt one way to help make your data points more searchable. As previously discussed though, such a solution can be handy while, at the same time, bear significant costs with regards to the maintenance of those indices. A better solution would be to denormalize tables into varying shapes, and this can obviously be accomplished by manually creating a new table. Writes to this new table would also be manual and driven at the application layer.

 

There is yet another way for denormalizing data in Cassandra off a base table - that is, from some existing table. Cassandra offers materialized views that are designed to automatically sync-up the data points from some existing table. This can be useful in situations when you have to produce a report, or pivot certain columns, or being able to partition and/or query by a column that was not part of the primary key from the original table.

 

For the purpose of this illustration, we will once more refer to the table yearly_donuts_by_user. To remind ourselves of its schema definition, you can run the describe table yearly_donuts_by_user; command in cqlsh.


cassandra@cqlsh:donutstore> describe table yearly_donuts_by_user ;

CREATE TABLE donutstore.yearly_donuts_by_user (
    userid text,
    purchase_year text,
    purchase_dt text,
    donut_name text,
    donut_price double,
    store_id text,
    PRIMARY KEY ((userid, purchase_year), purchase_dt, donut_name, donut_price)
) WITH CLUSTERING ORDER BY (purchase_dt ASC, donut_name ASC, donut_price ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX userid_idx ON donutstore.yearly_donuts_by_user (userid);

Given the composite partition keys - (userid, purchase_year), a query which filters the data points by userid and purchase_year columns will naturally be very efficient. In the example below, the query returns all the donuts bought by user sarah_666 in the purchase year of 2018.


cassandra@cqlsh:donutstore> select userid, purchase_year, purchase_dt, donut_name, donut_price, store_id from yearly_donuts_by_user where userid='sarah_666' and purchase_year='2018';

 userid    | purchase_year | purchase_dt | donut_name      | donut_price | store_id
-----------+---------------+-------------+-----------------+-------------+----------
 sarah_666 |          2018 |  2018-06-06 |     plain donut |        1.55 |      111
 sarah_666 |          2018 |  2018-07-07 | chocolate donut |        2.25 |      222

(2 rows)
cassandra@cqlsh:donutstore> 

What if you were now asked to pull up all the rows for a given store, say, those of store_id='111'. You obviously get an error similar to the one below as the store_id column is not part of the partition key.


cassandra@cqlsh:donutstore> select userid, purchase_year, purchase_dt, donut_name, donut_price, store_id from yearly_donuts_by_user where store_id='111';
InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot execute this query as it might involve data filtering and thus may have unpredictable performance. If you want to execute this query despite the performance unpredictability, use ALLOW FILTERING"
cassandra@cqlsh:donutstore> 

A Cassandra materialized view might just do the trick here! The create materialized view statement from the base table donutstore.yearly_donuts_by_user is shown below. You will notice that the view name is donustore.donuts_by_store_mv, but its data points will be co-located or partitioned by the (store_id) column. The primary key definitions of a given materialized view has to maintain parity with the primary key definitions from the base table.


create materialized view if not exists donutstore.donuts_by_store_mv
as select store_id
from donutstore.yearly_donuts_by_user
where 
	userid is not null 
	and purchase_year is not null
	and purchase_dt is not null 
	and donut_name is not null 
	and donut_price is not null
	and store_id is not null
primary key ((store_id), userid, purchase_year, purchase_dt, donut_name, donut_price)
with comment = 'View of donuts by store from yearly_donuts_by_user' ;

After running the above-mentioned create materialized view statement in cqlsh, you should then be able to effectively search for all donuts by a particular store_id. For instance, you could query for all the donuts that were bought from the store_id = '111' using the CQL query: select store_id, userid, purchase_year, purchase_dt, donut_name, donut_price from donuts_by_store_mv where store_id = '111';.


cassandra@cqlsh:donutstore> select store_id, userid, purchase_year, purchase_dt, donut_name, donut_price from donuts_by_store_mv where store_id = '111';

 store_id | userid    | purchase_year | purchase_dt | donut_name  | donut_price
----------+-----------+---------------+-------------+-------------+-------------
      111 |  john_123 |          2018 |  2018-05-08 | plain donut |         1.5
      111 | sarah_666 |          2018 |  2018-06-06 | plain donut |        1.55

(2 rows)
cassandra@cqlsh:donutstore>

When using materialized views, it is handy to visualize some of its default heuristics with the describe materialized view command.


cassandra@cqlsh:donutstore> describe materialized view donuts_by_store_mv;

CREATE MATERIALIZED VIEW donutstore.donuts_by_store_mv AS
    SELECT store_id, userid, purchase_year, purchase_dt, donut_name, donut_price
    FROM donutstore.yearly_donuts_by_user
    WHERE userid IS NOT NULL AND purchase_year IS NOT NULL AND purchase_dt IS NOT NULL AND donut_name IS NOT NULL AND donut_price IS NOT NULL AND store_id IS NOT NULL
    PRIMARY KEY (store_id, userid, purchase_year, purchase_dt, donut_name, donut_price)
    WITH CLUSTERING ORDER BY (userid ASC, purchase_year ASC, purchase_dt ASC, donut_name ASC, donut_price ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = 'View of donuts by store from yearly_donuts_by_user'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

cassandra@cqlsh:donutstore> 

Furthermore, when inspecting the base table yearly_donuts_by_user, you will now observe that the CREATE MATERIALIZED VIEW ... definition for donuts_by_store_mv is also associated with its schema definition.


cassandra@cqlsh:donutstore> describe table yearly_donuts_by_user ;

CREATE TABLE donutstore.yearly_donuts_by_user (
    userid text,
    purchase_year text,
    purchase_dt text,
    donut_name text,
    donut_price double,
    store_id text,
    PRIMARY KEY ((userid, purchase_year), purchase_dt, donut_name, donut_price)
) WITH CLUSTERING ORDER BY (purchase_dt ASC, donut_name ASC, donut_price ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';
CREATE INDEX userid_idx ON donutstore.yearly_donuts_by_user (userid);

CREATE MATERIALIZED VIEW donutstore.donuts_by_store_mv AS
    SELECT store_id, userid, purchase_year, purchase_dt, donut_name, donut_price
    FROM donutstore.yearly_donuts_by_user
    WHERE userid IS NOT NULL AND purchase_year IS NOT NULL AND purchase_dt IS NOT NULL AND donut_name IS NOT NULL AND donut_price IS NOT NULL AND store_id IS NOT NULL
    PRIMARY KEY (store_id, userid, purchase_year, purchase_dt, donut_name, donut_price)
    WITH CLUSTERING ORDER BY (userid ASC, purchase_year ASC, purchase_dt ASC, donut_name ASC, donut_price ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = 'View of donuts by store from yearly_donuts_by_user'
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

cassandra@cqlsh:donutstore> 

Within the "Big Data Land" though, things are not that straight-forward! Yes indeed that materialized views represent a handy feature that the current version of Cassandra offers. When applying them to your particular domain, you should weigh in on some of its intricacies: the additional cost on write operations, the typical frequency of updates to the base table that leads to Cassandra tombstones (we will cover this later), ...

Nadim Bahadoor on FacebookNadim Bahadoor on GithubNadim Bahadoor on LinkedinNadim Bahadoor on Twitter
Nadim Bahadoor
Technology and Finance Consultant with over 14 years of hands-on experience building large scale systems in the Financial (Electronic Trading Platforms), Risk, Insurance and Life Science sectors. I am self-driven and passionate about Finance, Distributed Systems, Functional Programming, Big Data, Semantic Data (Graph) and Machine Learning.