Previously in “Crawling Billions of Pages: Building Large Scale Crawling Cluster (Part 1),” we talked about the way to build an asynchronous fetcher to download raw HTML pages effectively. Now we have to go from a single machine to a cluster of fetchers, therefore, we need a way to synchronize all the fetcher nodes so that they won’t overlap. Essentially we want a queue. The requirements for the queuing system are:
- Handle billions of URLs.
- Maintain constant crawling speed per each domain.
- Dynamically reschedule failed jobs.
- Provide status report for crawling cluster in realtime.
- Support multiple queues with different priorities.
In BloomReach’s case, we use a single Redis server with 16GB memory, which can handle about 16 billion URLs. It is equivalent to one URL per one byte of memory. What follows is a description of our design process and some of its highlights.
Minimize the size of each entry
The first challenge to design a queuing system is to figure out what data you need to put into the queue. Once the data size for each entry is determined, it is a lot easier to estimate the capacity of the queuing system. In our case, if we put all the required URLs into the queue, assuming each URL consumes 512 bytes, our 16GB system would hold only 32 million URLs. This design is definitely not enough to handle billions of URLs.
So we have to combine multiple URLs into a single run corresponding to a task. In our system, we have 1,000 URLs per run. So, we can use a single path pointing to a S3 (Amazon Simple Storage Service) file containing a list of 1,000 URLs for this task. Effectively, we only need to have 0.5K bytes for 1,000 URLs in memory. Therefore, we should be able to handle 16 billion URLs with just 8GB of memory. Sounds easy, doesn’t it?
Transfer of Task Ownership
A typical queuing system allows task entries being pushed into or popped out. Effectively the ownership of a task is transferred from the producer of the task entry to the queue, and then from the queue to the task executor when the task is popped out from the queue.
The downside of this naive approach is that if the fetcher node crashes for any reason, the tasks currently running will be lost. There are several ways to avoid this:
- Provide task timeout. The queuing system can reclaim the task after certain period of time.
Pros: Clients of the queue don’t need to change.
Cons: Once the task is reclaimed and rescheduled, it may run concurrently with the stale task.
- Clients of the queue keep tasks in a file, and restart after crash.
Pros: Queue server doesn’t need to change.
Cons: Clients need to have a persistent database to maintain the running tasks.
Yet, there is still the need to transfer task ownership between client and server, which makes the system unnecessarily complicated. What we can do instead is keep the ownership of those tasks in the server after they are enqueued. That way the client doesn’t have any ownership of the task at any time. Instead we have a stateless list of tasks at any given time. It is very similar to how our library system works. Image every task is a book in the library and readers are clients who would like to read as much as possible. If we allow readers to ‘check-out’ books, we face the risk that some readers may lose their books. One solution is to never allow readers to ‘check-out’ books, and instead have them read digital versions online. That way, all the content stays in the library and there is no risk of losing books.
Each fetcher, the client of the queuing system, can obtain several tasks running in parallel at any moment. Like the library system, a reader is allowed to check out several books at the same time. So internally, the queuing system maintains a list of tasks each client is currently running. From the task list, each client would be able to check what task it should execute without actually looking into the queue. This approach dramatically reduces the complexity of the system because each client doesn’t need to know how the queue is implemented inside the queuing system. We will see how this helps build a more complex scheduling algorithm later.
As shown above, each client periodically syncs its own task list with the server. New tasks will be started once a client discovers new items on the list. On the other hand, once a task is marked done by a client, the server will remove the task and add a new task to the list.
Currently, more than 60 percent of global internet traffic consists of requests from crawlers or some type of automated Web discovery system. A typical website with a single node can handle from one request per second to hundreds of requests per second, depending on the scalability and implementation. So crawling a website is likely to put loads on the Web servers, therefore, a Web crawler should be polite and crawl each domain at very moderate speed, usually slower than 10 seconds a page. Crawling with extremely high speed could be considered a distributed denial-of-service (DDoS) attack. The perceived DDoS attack will result in your crawler quickly being blacklisted. One straightforward way to control the overall crawling speed is to control the number of fetching jobs running across the whole cluster.
On the other hand, one-off requests may come for various of reasons. People want to debug a certain set of pages, or more commonly, part of a website changes faster than others. For example, the homepage is usually updated more often than other pages. Therefore, having multiple queues for each domain is very useful in many cases.
The temptation is to create a priority queue with each task preassigned a priority. That way you can always process the priority tasks first. A priority queue has two issues: the priority value itself in each entry may increase the size of the task in memory. And secondly, high priority tasks will block low priority tasks. This is called starvation in scheduling algorithms. Monitoring the queue is also a challenge since most of the queuing systems use a linked list as the underlying data structure. Therefore, traversing the entire queue to read associated metadata is not practical.
A better approach is to use a randomized scheduler, for example, weighted fair queuing. With this approach, we can randomly schedule jobs from a group of queues without having different priorities at a job level. The chance of a queue being selected for scheduling is proportional to the priority of the queue.
A cool thing about this approach is that there is no starvation. Even jobs on queues with very low priority will get scheduled at some point. Reading progress for each queue is now a trivial matter because tasks with different priorities are put in different queues instead of mixing into a single priority queue.
On top of the scheduling and task dispatching, monitoring and showing stats is crucial for production operation. The monitoring server doesn’t have to be on the same node as the queue, but we put them on the same node for simplicity.
The goal is to show the real-time aggregation for different criteria, such as per queue or per fetcher node. Every second, each node will send back the effective crawling speed and error rates for each task (set of 1000 URLs). We can selectively aggregate the data and display the final results. If you imagine the input values as a single vector, the whole aggregation process can be written down as a single matrix multiplication. Each element in the matrix will have a value of either zero or one. Zero means the corresponding input value is not selected for aggregation, and one means the value is selected.
Matrix operations can be optimized with numerical libraries which take advantage of vectorized hardware operations, such as numPy. Instead of hard-coding those operations in a nested loop, we transform the whole aggregation process into a single matrix multiplication and then leverage numPy to perform the operation for us. After that, we can get the aggregated value every second, and then push those values upward to the minute, hour, and daily aggregation.
The Web UI can get real-time graph from the server without joining data every second. This approach basically amortizes the expansive joining for a given time period by running a partial join operation every second.
Handling billions of URLs in a single queuing system is definitely achievable. The key takeaway is to have the right architecture to help with data sharding and failure handling. Making every component stateless is essential for scaling out. Our queuing system, which has been running for more than a year, has handled more than 12 billion URLs without any sign of slowing down. The queuing system is a cornerstone for the robustness of our crawling system.