Increasing Cassandra Capacity for the Holidays Without Adding Nodes

  This is a  blog post by Jorge Rodriguez, Engineer in the Distributed Data Platform Team at Bloomreach.

Introduction

In our previous blog post, “Global Cassandra on AWS EC2 at BloomReach,” we discussed the strategy the data platform team used and the decisions we made when launching a global Cassandra cluster at BloomReach. If you have not read that post, it may be a good idea to read that first.

At BloomReach we have a great site-search product, which is now integrated on a number of high-profile e-commerce websites.  It is called Search, Navigation and Personalization (SNAP).  Beginning the week of Thanksgiving until the end of the year, the peak volume of requests that our SNAP product processes is expected to skyrocket  to two or three times the normal workload.  While expanding capacity for many of our API’s by simply adding hardware is a trivial matter, at our Cassandra database layer, this is a non-trivial operation.  As mentioned in the previous post, this is an operation that usually takes six hours for every instance we add.

 For this second installment of our four-part series, we will discuss how we increased the throughput of our front-end data centers by 4x  for the holiday shopping season, without adding any additional nodes.

A tough requirement

It was about two months back when we started mapping out our capacity requirements for the holiday season. Some things we took into consideration were:

  • We like our systems to have a peak load of no more than 50% of the known capacity.
  • During the holiday season we expect to receive four times our normal peak workload.
  • Our Cassandra infrastructure is already one of the most expensive infrastructures we are running at BloomReach.

Given these considerations, solving this problem by brute force would eat away at our profit margins.  As many may know, having healthy profit margins is critical for a startup’s valuation.   Instead, of brute force, we decided to re-evaluate our resources and look for optimizations that would make better usage of what we had already provisioned.

Resource Utilization

We began by identifying two key factors:

  1. What is binding our front-end read throughput?
  2. What are our current front-end resources and how are they utilized?

The answer to what is binding our front-end read throughput was simple.  We have a hard SLA of 125ms TP95 latencies for 50 key Cassandra lookups, but we like to operate at no more than half of that.  Our servers could technically handle a significantly higher workload than what we consider our capacity, however, once our known capacity is exceeded our latencies suffer.

We also know that there is a direct correlation between CPU usage on the front-end nodes and the latency.  When our front-end nodes exceed 40- to 45-percent CPU utilization, our latencies exceed our hard SLA requirement.  Our analysis showed that the CPU utilization was impacted by three main factors:

  1. Replication load from the back end.  As discussed in our previous post, our back-end pipelines generate the data that we serve.  These pipelines write to our back-end data center and immediately upon being written the data is replicated to three front-end nodes (since our replication factor is 3).  This replication load requires resources of course, so as the replication load increases, so do our latencies.
  2. Time spent doing disk IO operations. When reading data from an SSTable on disk, the processor needs to wait for the file to be read.  Similarly the same applies to writing data to disk.  Of course the use of SSD drives decrease this seek time, but it is nonetheless present.  This is why it’s extremely difficult to run Cassandra on a spinning disk and using SSD drives is highly recommended.
  3. Read request workload from our API’s.  This one is pretty obvious, as the number of read requests increases, so does the number of CPU cycles required to process those requests.

We then proceeded to look at our current resources and how they were being utilized.  As previously mentioned, we run on AWS EC2, and we use i2.xlarge instances for our Cassandra nodes.  These instances are storage optimized and feature:

  • 4 High Frequency Intel Xeon E5-2670 v2 (Ivy Bridge) Processors.
  • 30GB of RAM.
  • 800GB SSD drives.
  • A steep cost of  7 cents per hour.

For the most part, we were making good use of our resources.  Though there was a clear gap in the memory usage, we were only using 12GB of the 30GB available.  What a waste!  We knew we could do more with what we had; we simply had to figure out how.

Key Cassandra Concepts

In order to understand our approach to solving this problem, there are two key Cassandra concepts that should be understood.  I’ve simplified these concepts to the scope required to understand the optimizations that we made.

  1. The first is the Cassandra Token Range
  2. The second concept is the Cassandra Read Path

Cassandra Token Range

Cassandra is a distributed system with a masterless architecture.  As such, it needs a way to distribute the data amongst its nodes.  

  • Each node in a Cassandra cluster is assigned a token range, which is a range of hashes defined by a partitioner.
  • This range is defined by any possible long value.
  • So the possible tokens lie between -9223372036854775808 to 9223372036854775807.
  • Every key, when run through the partitioner hashes to a value in this range.

The image below represents an eight node Cassandra cluster.  You can think of the circumference of the circle as the token range and each node is responsible for it’s chunk of the values.  

pasted image 0

Please note this is a simplified version of the token range.  Since we use virtual nodes, there are actually 8 x 32 token ranges to distribute amongst these nodes.  But that additional complexity is beyond the scope of required knowledge to understand our approach.

Cassandra Read Path

Assuming the same eight-node cluster, let’s consider what happens when a client sends a read request to Cassandra.

pasted image 0 (1)

  1. Firstly, the client will pick a node to which it will send the read request.  This node will become the “coordinator” for the request.  In this example, node 8 receives the read request for keys A,B,C and D.
  2. The coordinator node will then run the keys through the partitioner to determine the token value and will send a request to the nodes which own the range this token falls under.  So in this case, the coordinator node will:
    1. Send one message to node 7 for key C.
    2. Send one message to node 4 for key B.
    3. Send two messages to node 2, for keys A and D.
  3. For each of these messages, the following interactions happen.  Pictured below, the red colored boxes represent memory operation and the dark blue colored boxes represent Disk I/O operation.

pasted image 0 (2)

    1. The MessagingService in the coordinator node will send a message to the node which has the required key.  Let’s call it the “owner” node.
    2. The owner node will then:
      1. Look for that particular key in the memtable, which contains recent writes.
      2. Look for that key in all of the SSTables for the given keyspace and column family.
      3. Merge all of the results to determine the most recent data for the key.
      4. Respond to the Coordinator node through the MessagingService.
    3. If any owner node is taking too long to respond, the Coordinator node will make a speculative retry.
      1. It will continue to wait for a response from the owner node, but at the same time it will send the same request to another node, which also has the same data (assuming the replication factor is > 1).
      2. The speculative retry timeout is configurable at the column family level.  It can be configured to:
        1. ALWAYS – Retry reads of all replicas.
        2. Xpercentile – Retry reads based on the percentile latency.
        3. Yms – Retry reads after specified milliseconds.
        4. NONE – Do not retry reads.
      3. The default value is 99th percentile.  But we discovered early on that this value is not aggressive enough for us, and having one or two nodes experiencing high latencies would cause us to exceed our desired performance.
      4. We configured our column families to use a 15ms speculative retry.  From our testing, this ensured good performance while not overburdening the nodes.

Once all of the owner nodes have responded with the required data, the Coordinator node will respond to the client with the results.

Targeting Queries

At BloomReach, we had already developed what we called the Quality of Service API.  This API was designed to make our Cassandra data accessible to some of our applications, though it was not yet the primary source of serving infrastructure for our SNAP application.  

One of the things we noticed through some experimentation was that the Coordinator nodes were doing a lot of work.  And that much of the CPU usage was related to the coordination work being done.  The storage proxy on the coordinator node had to:

  1. Parse the query and determine which nodes own the data.
  2. Send one message through the messaging service for each key being received.
  3. Wait for responses to all of these messages, and potentially do speculative retries.
  4. Respond to the client with the results.
  5. Do all of this work with a limited number of available threads.

We had the idea to reduce this workload by creating a smarter client.  This client would group the keys being requested for a query into the appropriate token range and then send the request directly to a node which owns that range.  The benefits of this approach were:

  1. Reduced the number of keys per individual query.
  2. Most reads occur locally:
    1. Avoids sending a message per key to peers in the data center.
    2. Avoids having to wait for the responses from peers.
    3. Our network resources were not overly utilized, but there is certainly CPU overhead of all of these network operations.  For a 50-key lookup there are 50 messages to send and await responses from.
    4. Speculative retries are a rare occurrence.
  3. Moved much of the sending and receiving of messages to a service where we have significantly more threads to deal with these operations.

pasted image 0 (3)

Additionally, we run our QOS API on nodes which are CPU optimized.  This makes the cost per CPU core 1/5th of the cost per CPU on our Cassandra nodes.  With this change, we observed a 50% increase in our capacity and it was accompanied by a nice boost in performance as well.  While that was not our primary goal, it was certainly a very positive impact.

Putting our RAM to work

The second change we made was to make better utilization of our memory to reduce CPU load.  For this change, we simply had to leverage a feature of Cassandra which we were not leveraging, the Row Cache.

Through some calculations, we determined that we could fit 35 percent of our relevant data into memory by allocating 10GB of memory per node to the row cache.  Additionally, the cache hit ratio was extremely high at 80 to 85 percent!  This is because the most popular products are the most sought after.  As illustrated by the updated MessagingService Diagram below, introducing the row cache avoids having to look on disk for the data in favor of retrieving it from memory.

pasted image 0 (4)

Serving the data from memory reduces the CPU impact, which was our primary goal.  It also has the added benefit of being much faster.  So again, we sought to improve our throughput and we got a nice performance boost as well!

Results

After we switched our SNAP API’s to retrieve data through our QOS API and enabled the Cassandra row cache, we re-ran our load tests.  And we were extremely pleased with the results.

  • We had achieved our goal of four times our existing capacity.
  • We had added no additional instances.
  • We improved our average latencies by 50 percent, without even trying!  

The only dangerous thing here is that we set a precedent, which we now have to live up to.  When we focus on performance improvements, we’ll have to deliver these kinds of results to make our management team happy :).

Conclusion

We learned some important lessons through this exercise:

  1. It is really good practice to monitor all system metrics and to have a very good grasp on your resource utilization.
  2. Before going off and expanding your infrastructure, have a good understanding of why it is necessary.  This means understanding what binds your throughput and considering ways to improve that bottleneck.
  3. If operating in the cloud, select your instance types carefully and thoughtfully.  Understand what drives the cost of the instance and pick instances according to your true requirements.  Usually this will take some performance and load-test cycles to get right.

We hope you have enjoyed this blog post.  Please stay tuned, as we have two more posts from the BloomReach Data Platform team coming in the next few weeks.

The third installment, “Distributed QPS Monitoring and Throttling System,” will describe a system we built in order to improve the stability of our cluster by preventing it from being issued more operations than it can support.

The fourth and final installment will describe our Elastic Cassandra solution.  That’s how we solved the problem of horizontally scaling the number of concurrent MapReduce pipelines that we can run at any given time in order to support the elastic nature of MapReduce to it’s full capacity.

 
Nitin Sharma

Nitin Sharma

 
  • Gennady Nurik

    Can you provide more info/code regarding how you achieved – “This client would group the keys being requested for a query into the appropriate token range and then send the request directly to a node which owns that range.”

    • Jorge I Rodriguez

      Hi Gennady, apologies for the delayed response as I just saw your comment.
      The client maintains token range ownership information in memory, and it refreshes it periodically. For any given key, it can be ran through the partitioner to get the token value. Hope that helps answer your question. You can email me at jorge at bloomreach dot come if you’re interested in more details.