Pages

Sunday 26 February 2012

Cassandra Bulk Loader


Bulk Loading into Cassandra cluster was used to be a difficult task. But if you have all the data to be inserted in place , loading data to the Cassandra is going to be a cakewalk with utility that Cassandra has introduced  in Cassandra-0.8.1.
I tried inserting the data to the Cassandra using Cassandra Java Client  Pelops and Python Client Pycassa. But for the heavy data load we have (70 GB of text data) we found that both of these libraries are not helpful.
After lots of effort on Pycassa and Pelops we tried the sstableloader and this utility is more robust  and time efficient. Sstableloader  utility loads the available or generated sstables into Cassandra .
In this Blog I am going to cover
  1. Generating sstables
  2. Loading sstables with sstableloader
  3. Improving performance of generation and loading
  4. Tuning  generation for optimize performance of the Cassandra
  5. Advantage of sstableloader utility over other bulk loading  techniques.
Sstableloader:
Cassandra 0.8.1 introduces a new tool sstableloader to load the sstables to Cassandra and this is the fastest way to insert the data into Cassandra. ( Later in this document we will see how to use sstableloader )
sstableloader is a tool that, given a set of sstable data files, streams them to a live cluster. It does not simply copy the set of sstables to every node, but only transfers the relevant part of the data to each, conforming to the replication strategy of the cluster.So the sstable needs the data to be in the sstable format,Lets see how to generate sstables from the row or text data. 

Why Use SStable Generator?
Ssstableloader is the best way to insert the data into Cassandra and it requires the data in the form of  sstables.
We have the raw( text ) data which is not in the form of sstables. So we need to create sstables from this raw data .
So to convert this raw data into the sstables the simplest solution is the new Java class SSTableSimpleUnsortedWriter introduced in Cassandra 0.8.2. 

1) Generating SSTables

SSTableSimpleUnsortedWriter API

org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter 

a) Creating  Writer :

Here we get the writer which creates sstable for a given column family of given keyspace

SSTableSimpleUnsortedWriter columnFamilyWriter = new SSTableSimpleUnsortedWriter(
        directory,
        keyspace,
        "ColumnFamilyName",
        Utf8.instance,
        null,
        64);
  • The directory is the directory where to put the sstables ,this directory is object of File class in java
  • Keyspace is  the keyspace of the column families (a String).
  • Next, there are the column family name and the comparator and sub-columns comparator–here, we don’t use super columns so the sub-columns comparator is null.
  • The last parameter is a “buffer” size:
  • SSTableSimpleUnsortedWriter buffers whatever input it gets in memory and “flush” everything in one sstable once the buffer size is reached.
  • That is, the resulting sstables will be approximately of size equal to buffer.
b) Writing to sstables :

Here using above writer we populates rows in column family with column names and value specified.

for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addColumn(bytes("ColumnName"), bytes(“ColumnValue”),
timestamp);


                        }

c) Writing to sstables with TTL :

TTL is time to live .it is provided in seconds and the column will be expired after   the given seconds.

for (...each Row ,curresponding Column and Value)
{
    columnFamilyWriter.newRow(byte(RowKey));//byte converts String into ByteBuffer
    columnFamilyWriter.addExpiringColumn(bytes("ColumnName"), bytes(“ColumnValue”), timestamp, ttl, expirationTimestampMS );
}

Note: expirationTimestampMS -this is the server time timestamp used for actually
expiring the column.It should be (insertion time in milliseconds + TTL in milliseconds)
 
 d) Closing sstablewriter  :
columnFamilyWriter.close();
 
Things to remember to compile and run the program for sstables generation:
To compile this file the Cassandra jar (>= 0.8.2) needs to be in the classpath (javac -cp <path_to>/apache-cassandra-0.8.2.jar DataImportExample.java). To run it, the Cassandra jar needs to be present as well as the jar of the librairies used by Cassandra (those in the lib/ directory of Cassandra source tree). Valid cassandra.yaml and log4j configuration files should also be accessible; typically, this means the conf/ directory of the Cassandra source tree should be in the classpath–see here for a typical launch script that sets all those. As of 0.8.2, you will need to set the data_file_directories and commitlog_directory directives in said cassandra.yaml to accessible directories, but not ones of an existing Cassandra node. (This will be fixed in 0.8.3, but in the meantime using /tmp for both is a good idea.) The only useful property you need to set up for SSTableSimpleUnsortedWriter is the partitioner you want to use.
Once You have sstables in place the further step is to load these sstables to Cassandra cluster,Here the sstable loader comes in picture.

2) Loading sstables with sstableloader
Configuration :
  1. To learn the topology of the cluster, the number of nodes, which ranges of keys each node is responsible for, the schema, etc., sstableloader uses the Cassandra gossip subsystem. It thus requires a directory containing a cassandra.yaml configuration file in the classpath
  2. In this config file, the listen_address, storage_port, rpc_address and rpc_port should be set correctly to communicate with the cluster, and at least one node of the cluster you want to load data in should be configured as seed. The rest is ignored for the purposes of sstableloader.
  3. Note that the schema for the column families to be loaded should be defined beforehand, using you prefered method: CLI, thrift or CQL.
  4. Remove the initial token in Cassandra.yaml
  5. Change the listen_address and rpc_address in Cassandra.yml to private ip of the loader machine.
Running Sstable-Loader :
We will need cassandra 0.8 or newer version to run the sstable –loader (We are using cassandra0.8.2 ) 
  1. With sstableloader, you first need the sstables–only the -Data and -Index components are required, the others (-Statistics and -Filter) will be ignored
  2. These sstables have to be in a directory whose name is the name of the keyspace of the sstables. This is how they will be stored in either the main data directory, or a snapshot.
    Say
    TestKeyspace is the name  our keyspace, then will require all the sstables in the directory named TestKeyspace
  3. Then, assuming sstableloader is configured to talk to your multi-node cluster:
        Go to CASSANDRA_HOME/bin and say
        $ ./sstableloader TestKeyspace
        This will start loading sstables to the cluster sstable loader configured to talk
3) Improving performance of generation and loading

  1. If the JVM and  GC tuning is not done properly you will not experience the power of this utility. For the generation while running the program you should provide the appropriate -Xmx and -Xms value. As sstablewriter keeps writing into  heap  still sstable reach  buffer size, providing the sufficient heap is very essential. Too small size might result into Out of memory error.

    Java  -ea -cp $CLASSPATH  –Xms3000M –Xmx3000M \
    -Dlog4j.configuration=log4j-tools.properties
    your.program.for.SstableGenerator

    I have experienced the better result with 3000MB of heap size for 300MB of buffer size.
  2. Also this is very essential to write the row once and all the columns following it and not repeating call to newRow for the same row and different column again and again. For that you may iterate over certain no of rows and store the columns for the each row in some Collection may be Map. And then you can call to newRow() once and addColumn() for all the columns for that row.I have observed  50 % of performance growth with this approach.
    For Example:

    for (...each row and corresponding columns ...)

    {
    columnFamilyWriter.newRow(bytes(uuid));

        columnFamilyWriter.addColumn(bytes("firstname"), bytes(entry.firstname), timestamp);
        columnFamilyWriter.addColumn(bytes("lastname"), bytes(entry.lastname), timestamp);
        columnFamilyWriter.addColumn(bytes("passsword"), bytes(entry.password), timestamp);
        columnFamilyWriter.addColumn(bytes("age"), bytes(entry.age), timestamp);
        columnFamilyWriter.addColumn(bytes("email"), bytes(entry.email), timestamp);}
  3. Also while loading you should have allocated sufficient heap to sstableloader for better performance. I have observed  better performance with 4000MB of heap for 60 GB of sstables.
    In our case 60 GB of sstables were loaded in 20-25 mins .
    To set the -Xmx and -Xms   for sstableloader:

        a. Open file "CASSANDRA_HOME/bin/sstableloader.sh"
        b. search for
            $JAVA -ea -cp $CLASSPATH -Xmx256M \
            -Dlog4j.configuration=log4j-tools.properties \
            org.apache.cassandra.tools.BulkLoader "$@"
        c. in above part of the file change –Xmx and add –Xms. Have same value for both option.
4)  Tuning  generation and loading for optimize performance of the Cassandra
  1. Deciding on an appropriate value of the Buffer is very important part of the sstable generation.
  2. As setting this buffer size low results into faster generation and loading, but it puts load on cassndra and one should not set it to smaller value.
  3. With small buffer size the large no of small size sstables will be generated which when loaded into the Cassandra takes too much time to complete compaction and puts load on Cassandra cluster.
  4. If the size of sstable generated is “X” before loading then the size of sstable you will find after loading into Cassandra is
    (X * replication_factor) /( number_of_nodes_into_cluster)

    You can observe that as the no of nodes increases the size of sstable will be smaller .This will help you to decide the buffer size as per you cluster size.
  5. Also you can turn minor compaction off while loading data into Cassandra for faster loading.(You can do this with nodetool setcompactionthreshold)
  6. While loading is in progress the data becomes available only after the rebuilding of indexes is completed(Won’t  take that much time).
5) Advantage of sstableloader utility over other bulk loading  techniques.
  1. It do not put much load on loader machine as well as Cassandra cluster.
  2. Of course it’s  faster than any other loading methods.
  3. In all other bulk loading techniques you keep hitting Cassandra all the time. On the other hand in sstableloader while generating the sstables you do not put a load on Cassandra cluster(Provided you are not using the same machine on which Cassandra is running to generate the sstables)
  4. In our case to generate 140 GB of sstables from text data of 70GB it took 3 hours, and loading is completed in just an hour. That is we are hitting the Cassandra cluster just for an Hour to load the data.
 References: