This is a blog post by Jorge Rodriguez, Engineer in the Distributed Data Platform Team
The BloomReach platform team, which builds the infrastructure that runs our powerful products, was given a job to do: create a datastore platform capable of supporting heterogeneous workloads. This platform would be used across all applications at BloomReach and implement the following high-level requirements:
- Support back-end pipelines (mostly map-reduce jobs) performing scans, lookups and writes with varying degree of throughput requirements and SLA’s.
- Support front-end applications, which require data produced by back-end pipelines in real-time to be served at low latencies to locations around the globe.
- Provide a platform that effortlessly scales to hundreds of customers with millions of pages and across millions of users.
How we started
We opted for Cassandra for our distributed database system. We felt it best suited our requirements. It features:
- Linear scalability with a decentralized architecture with no masters.
- Proven fault tolerance on cloud infrastructure.
- Cross-data-center replication technology.
- All our required features are available in the open source version.
We started with a single Cassandra cluster, which we set up on AWS EC2. For our initial configuration we:
- Used the SSD storage optimized i2.xlarge instance type.
- Assembled it with a network topology structure of a single back-end data center to support back-end pipelines and multiple front-end data centers to support our low-latency API’s.
- Enabled vnodes for the automated token range management.
- Used the native protocol datastax Java driver and configured our clients with all the goodness of round robin, token aware and data-center aware policy to route clients to the appropriate data center and node.
We considered using separate back-end and front-end clusters but decided against it, mostly due to our requirement for our front-end API’s to reflect changes made by back-end pipelines in real-time. If we wanted this feature using separate clusters, we would have to build the replication and maintain the consistency across the clusters ourselves. The Cassandra cross-data center replication strategy would take care of these things for us. It also greatly simplified the requirement for our data to be made available globally for our front-end applications.
BloomReach is a company with customers around the globe and our customers expect operational excellence, which includes great performance and being highly available to our front-end applications. Launching across a single AWS EC2 instance would only provide us with the same availability SLA of the given AWS region, which is only 99.95%. Additionally, in order to achieve our latency goals, we needed to co-locate our services as close as possible to our customers in order to reduce the network overhead.
We decided to launch our cluster with front-end data centers across AWS regions in the United States (east and west) and in Europe (Ireland). Our cluster additionally supports expansion into any AWS region. By providing front-end data centers in multiple regions, our services become highly available, as all three of these regions would need to go down in order for us to suffer an outage. Additionally, we can serve from the closest location to our end users.
Snitches inform Cassandra about the network topology of the cluster. It specifies the IP address of the nodes in the cluster and which data center and racks they belong to. This information allows requests to be routed efficiently and it allows Cassandra to distribute replicas by grouping machines into data centers and racks.
As part of configuring our cluster, we need to consider the appropriate snitch strategy. On AWS EC2 Web services there exists the concept of internal and external IP addresses. Each instance receives one of each. Internal IP’s are only accessible within an AWS region, while external IP’s are accessible to the outside world. Ideally our nodes will communicate over internal IP’s with other nodes within the same region and over external IP’s with nodes in other regions in order to achieve the best performance.
Cassandra provides an “Ec2MultiRegionSnitch” implementation. This implementation would always connect over the public IP but detect when a node is in the same AWS EC2 region. Then it would replace the TCP connection with one over the private IP by leveraging the “gossiperStarting()” function of the snitch. This was just what we were looking to do! However, we quickly realized this was not a good option for us for two reasons:
- It makes the assumption that an AWS EC2 region is a data center, which is false in our use case, as our back-end data center lives in the same region as one of our front-end data centers.
- It relies on the AWS API’s in order to determine the node’s region and its private IP address from the public one. And if there is one thing I’ve learned from extensively using AWS is that their least reliable component is their EC2 API’s. These are infamous for having outages. In fact, in the past six months, there have been two such outages while existing EC2 instances were unaffected.
Of the provided Cassandra snitch strategies, we found that we liked the PropertyFileSnitch the best. The reason is that it is explicit and does not depend on any external service hence it can not go down. The PropertyFileSnitch configuration specifies the node’s IP, data-center name and rack information in a topology config file. So we decided to extend this configuration to include the private IP as well. We called it the GlobalPropertyFileSnitch, and our topology file format was as follows for each node:
We named our data centers according to their function (i.e. us-east-1-frontend, us-west-2-frontend, us-east-1-backend). For the rack name, we used the AWS ec2 region of the instance (i.e. us-east-1e, us-east-1c, us-west-2a etc.). From this information, we could infer the region, so that nodes in different data centers, but the same region, could also connect over private IP.
Our GlobalPropertyFileSnitch implementation is almost exactly that of the PropertyFileSnitch, with two main modifications:
- We modified the file parsing to also parse out the private IP address of the node. This allowed us to avoid using the AWS EC2 API to get this information.
- We overrode the “gossiperStarting()” function, as it is done in the Ec2MultiRegionSnitch, and additionally provided our own GlobalReconnectableSnitchHelper in order to base the reconnect decision on the AWS EC2 region and not the data center name as the ReconnectableSnitchHelper does.
public void gossiperStarting()
Gossiper.instance.register(new GlobalReconnectableSnitchHelper(this, getRegion(localPublicAddress), true));
It wasn’t long before we had a fully functioning cluster; and we launched it for our first few customers. We had fulfilled our first two goals. Our back-end pipelines were doing their thing and generating this awesome data, which was then served by our front-end API’s at low latencies and delivering great value to our customers. Everyone was happy and we were feeling great about our achievement, so it was thoroughly celebrated.
The next day we recovered from our celebration slightly hungover and set on to achieve the third goal: We needed to scale our data platform across many customers and applications. We had our minds set on delivering this great value to all of our current and future customers.
The Challenges of Scaling
It has been discovered during many engineering projects that scaling can present a different set of challenges. This was especially true since our requirements were so diverse. As we started onboarding more customers onto our new platform, we began to experience the pains of operating a data platform at large scale in a cloud environment.
In the remaining part of this blog post, I will list the toughest challenges we faced in trying to scale out our data platform. If you want to know how we resolved these challenges though, you’ll have to stay tuned for our upcoming blog posts.
Challenge 1) Scaling a fixed resource (Cassandra) to support an elastic resource (EMR).
As we started to run more back-end pipelines against our new cluster across different customers and applications, we found that back-end pipelines tend to generate peak loads. These peaks could expand elastically through launching multiple map-reduce jobs simultaneously. We attempted to control the back-end load, but this is very difficult to do across various dev teams. Even within a single dev team it’s a tough task to control pipelines across a large number of customers to ensure we stay within the capabilities of our Cassandra back-end data center.
We solved this problem with a one-two punch:
- A distributed throttling system. We called it the “rate-limiter.” It supports throttling based on operation (read, write, scan, connect) and supports independent throttling settings per application. In the third installment of our blog series on Cassandra at BloomReach, we’ll detail how this solution works.
- Through rate-limiter we were able to stabilize our cluster. This provided some relief to our cluster, but it was at a cost. We had converted EMR from an elastic resource to a fixed resource. This meant that in order to scale EMR, we now had to scale Cassandra. In our experience, adding and removing nodes on the cluster had been a difficult task, especially under heavy load. Automatically expanding and contracting the size of the cluster to match the peak loads was not an option. So we created the Elastic Cassandra solution, to be detailed in the fourth and final installment of our blog series.
Challenge 2) Scaling for the Holidays
Because BloomReach currently works primarily with e-commerce enterprises, we can see extreme increases in our capacity during the week of Thanksgiving and Christmas. For example, this year we expect a 10x increase in traffic for our services.
As previously mentioned, our experience with Cassandra is that expanding and contracting the size of the cluster is a costly operation. In our version of Cassandra and with our dataset, it usually takes us about six hours just to add a single node. Even worse, when removing a node while the cluster is under heavy load, we experience drastic latency spikes causing our SLA’s to get tossed out the window. Additionally, our Cassandra infrastructure is already one of the most expensive systems we run at BloomReach, so we really wanted to keep costs down.
This forced us to think long and hard about strategies to increase our capacity while maintaining costs low and minimizing the number of nodes we needed to add. We were able to come up with two strategies, which in combination provided us enough capacity that we will not need to add a single node in order to achieve 10x throughput. In the spirit of the holidays, our next installment of this blog series will be about how we achieved this tall task.
More to Come …
During this blog post, we described the strategy we employed and decisions we made while launching a globally available Cassandra cluster. Please stay tuned in the coming weeks for the next three installments of our blog series. The next installment, “Increasing Cassandra Capacity for the Holidays Without Adding Nodes”, will describe how we increased our capacity for 10x read throughput without adding any additional nodes to the cluster.
The third installment, “Distributed QPS Monitoring and Throttling System,” will describe a system we built in order to improve the stability of our cluster by preventing it from being issued more operations than it can support.
The fourth and final installment will describe our Elastic Cassandra solution. That’s how we solved the problem of horizontally scaling the number of concurrent map-reduce pipelines that we can run at any given time in order to support the elastic nature of map-reduce to it’s full capacity.