Distributed QPS monitoring and throttling system

This is a blog post by Cheng Ren, Engineer in the Distributed Data Platform Team

Introduction

This post is the third installment of the BloomReach Cassandra Infrastructure series. In previous posts, we described the overall architecture of our Cassandra cluster and how we handle surging peak traffic. This post will focus on a critical component safeguarding the whole Cassandra infrastructure – a QPS monitoring and throttling system which we call “rate limiter”.

Context

We are a big data company. We write hadoop pipelines to process large amount of data for our customers in a scalable way. Those pipelines are categorized by their purposes, for example, we have personalization(P13N) pipelines persisting user events to a Cassandra (C*) database; data-ETL pipelines serializing product data to C* tables and indexing pipelines indexing products from C* to Solr. Every day those pipelines read and write data from/to our C* backend cluster with tens of thousands QPS at peak time. We also have front-end data centers serving latency-sensitive applications with live traffic from customers’ websites. Our infrastructure is illustrated as Fig.1 and described in detail in a previous blog post. The two different user cases imposes two different requirements on our infrastructure:

  • For the back-end data center, we want to provide high throughput and availability to consuming pipelines
  • For the front-end data center, keeping latency below SLA is the highest priority

These two requirements create intrinsic challenges to our infrastructure:

  • A latency spike comes with high back-end pipeline throughput. Between back-end and front-end data centers, we rely on C* internal write replication to achieve consistency. When pipelines write to the backend cluster with max throughput, the replication goes to the front-end data centers contending their hardware resources and resulting latency spike.
  • A few misbehaving pipelines churn up the backend cluster’s resources, sidelining other pipelines. For example, the misbehaving pipelines might send full table scans or read requests at a rate the cluster cannot handle.
image00

Fig.1 Cassandra infrastructure overview

 

Altogether these issues can be abstracted as a resource allocation problem – how can we best allocate the back-end data-center-serving resource to meet heterogeneous needs from various pipelines, while not hurting the performance of the front-end data center? Since our back-end data center’s capacity is fixed and we don’t want to exceed that limit for each operation type, one approach would be limiting the total number of operations, per type, per second and reject further operations if the limit has been reached. However, this doesn’t work for us, since we are serving multiple pipelines. Such a restriction would cause starvation if one dominating pipeline continuously occupies the quota. 

Rate limiter

The way we tackle the problem is by using an in-house-developed distributed QPS monitoring and throttling system, which we called “rate limiter”, we monitor and throttle QPS per pipeline type/operation. Specifically the system is able to perform following actions:

1) Enforce maximum QPS (reads, writes, scans…) restriction for each pipeline type(P13N, indexing, data-ETL …). Once requests exhaust the allowed QPS tokens, violating operations will be paused until new QPS tokens become available.

2) Visualize QPS (reads, writes, scans) status each pipeline type have been producing.

From high-level, the rate limiter consists of the following components:

  1. A Redis node, storing QPS and token status per pipeline type, per operation
  2. Token management logic implemented by Node.js, refreshing tokens and calculating QPS based on token usage for every period of time.
  3. A rate limiter client requesting tokens from a Redis server and throttling the request if all the tokens are used up.
  4. A control dashboard allowing users to adjust QPS quota.
  5. A visualization dashboard illustrating how many QPS every pipeline type has been issuing. 
Fig. 2 Rate Limiter Overall Architecture

Fig. 2 Rate Limiter Overall Architecture

Rate limiter client

The rate limiter client is a layer we introduced, which is used across all backend pipelines. Before any operation interacting with the back-end data center starts, the client would query the rate limiter server to get tokens. The client can fetch one token per second, or 10 tokens per 10 seconds. Each database hit consumes one token. It would be a trade-off between the freshness of the QPS and the loads of the rate limiter server. If we choose to fetch one per second, server will have a higher load and the result will be more accurate and real-time.  The server decrements the tokens and returns the number of remaining tokens. When the client finds out the token is less than 0, it waits for a few seconds and asks for the tokens again. It waits until the client gets a positive number of remaining tokens.

Rate limiter server

The rate limiter server includes two components:

  • One Redis database keeping track of available tokens and QPS for each pipeline/operation.
  • Node.js module replenishing tokens and recalculating QPS.

We chose Redis because it provides high throughput and low latency for small size data.

We need to store tokens and QPS per pipeline type/operation and we only have fewer than 10 pipeline types and a trivial amount of operation types. All of them combined would only require few hundreds key-value pairs. With such a small data size, Redis exhibits surprisingly high throughput. From our experiment, one Amazon AWS C3.Large (Intel Xeon E5-2680 v2 Processors, 2 vCPUs, 3.75 GB Memory, 2 x 16 GB SSD storage) Redis node is able to take full load from all our pipelines, which translates to tens of thousands of rate limiter queries per second and concurrent rate limiter connections. The property of low latency coming with in-memory database is also a highly desirable trait, since we don’t want to introduce an extra throttling layer to undermine our pipelines’ SLA.

Redis mainly keeps three types of data for each pipeline/operation:

  1. Maximum allowed QPS, which is defined by the user through a control dashboard(Fig. 3).
  2. Token, which represents how many requests the particular pipeline type/operation can make. Being negative means there are no more available tokens and this request type needs to wait.
  3. QPS, which is calculated based on token usage.

Tokens will be replenished and QPS will be recalculated every 10 seconds.

Bytes per Second(BPS) throttling and reporting

QPS throttling gave more stability to our cluster, but we were still  seeing front-end latency spike by writes from certain pipelines, even if their QPS was under strict control. After thorough investigation, we found those write requests usually contained larger data, like a large blob or a collection of long strings. From the database point of view, processing heavy requests is generally more expensive than requests for writing small data, like timestamp or single string. To have finer-granularity control on pipelines, we decided to apply BPS monitoring and throttling to our system as well. Its throttling logic is same as QPS, the only difference being that for each write query, the rate limiter client calculates the byte size and asks the server for the byte tokens.

Dynamic token allocation

In contrast to static token allocation where pipeline/operation will be throttled once the allocated tokens are used up, we built dynamic quota allocation inside token-refilling logic, which makes an under-allocated pipeline better utilize the unused tokens from others. Every two minutes  the rate limiter server redistributes tokens with the following algorithm:

  1. Identify under-allocated and over-allocated pipeline/operation based on real QPS and maximum allowed QPS.
  2. Allocate 50 percent of unused tokens from over-allocated to under-allocated ones.
  3. Check loaning pipeline/operation token usage, immediately request their lending token back if they are running out of tokens.

We make dynamic token redistribution the default pattern, but we also allow users to switch to static mode in the runtime (Fig. 3). The rationale behind this is we want to maximize pipelines’ throughput while respecting the total limit,  while restricting suspicious and hostile pipelines from taking others’ tokens. For example, in the Fig. 3 setting, PIPELINE-C is isolated from dynamic token allocation.

Fig. 3 Rate Limiter Control Dashboard

Fig. 3 Rate Limiter Control Dashboard

Observability and manageability

Our control dashboard is shown as Fig. 4. We also have a script running in the rate limiter server feeding QPS/BPS data to the company’s central Graphite Monitoring server:

Fig. 4 READ QPS Monitoring

Fig. 4 READ QPS Monitoring

More to come

The rate limiter has been in use in production for two years. It processes Cassandra requests from all of our pipelines, across the company, at a rate of 30,000 peak QPS. One of our design goals for the rate limiter is to be a generic throttling system, safeguarding any critical infrastructure.  So we are also planning to promote the rate limiter  to other infrastructure inside the company, like Solr cloud, and open source it in future. So stay tuned!

 

Warren Mar