Mussel — Airbnb’s Key-Worth Retailer for Derived Information | by Shouyan guo | The Airbnb Tech Weblog

How Airbnb constructed a persistent, excessive availability and low latency key-value storage engine for accessing derived information from offline and streaming occasions.

By: Chandramouli Rangarajan, Shouyan Guo, Yuxi Jin

Inside Airbnb, many on-line companies want entry to derived information, which is information computed with giant scale information processing engines like Spark or streaming occasions like Kafka and saved offline. These companies require a top quality derived information storage system, with sturdy reliability, availability, scalability, and latency ensures for serving on-line site visitors. For instance, the consumer profiler service shops and accesses real-time and historic consumer actions on Airbnb to ship a extra personalised expertise.

On this put up, we are going to discuss how we leveraged quite a lot of open supply applied sciences, together with HRegion, Helix, Spark, Zookeeper,and Kafka to construct a scalable and low latency key-value retailer for lots of of Airbnb product and platform use circumstances.

Over the previous few years, Airbnb has developed and enhanced our assist for serving derived information, shifting from groups rolling out customized options to a multi-tenant storage platform known as Mussel. This evolution may be summarized into three levels:

Stage 1 (01/2015): Unified read-only key-value retailer (HFileService)

Earlier than 2015, there was no unified key-value retailer resolution inside Airbnb that met 4 key necessities:

  1. Scale to petabytes of information
  2. Environment friendly bulk load (batch era and importing)
  3. Low latency reads (<50ms p99)
  4. Multi-tenant storage service that can be utilized by a number of prospects

Additionally, not one of the present options have been capable of meet these necessities. MySQL doesn’t assist bulk loading, Hbase’s large bulk loading (distcp) will not be optimum and dependable, RocksDB had no built-in horizontal sharding, and we didn’t have sufficient C++ experience to construct a bulk load pipeline to assist RocksDB file format.

So we constructed HFileService, which internally used HFile (the constructing block of Hadoop HBase, which is predicated on Google’s SSTable):

Fig. 1: HFileService Structure
  1. Servers have been sharded and replicated to handle scalability and reliability points
  2. The variety of shards was mounted (equal to the variety of Hadoop reducers within the bulk load jobs) and the mapping of servers to shards saved in Zookeeper. We configured the variety of servers mapped to a selected shard by manually altering the mapping in Zookeeper
  3. A day by day Hadoop job remodeled offline information to HFile format and uploaded it to S3. Every server downloaded the information of their very own partitions to native disk and eliminated the outdated variations of information
  4. Completely different information sources have been partitioned by main key. Purchasers decided the proper shard their requests ought to go to by calculating the hash of the first key and modulo with the whole variety of shards. Then queried Zookeeper to get a listing of servers that had these shards and despatched the request to one in every of them

Stage 2 (10/2015): Retailer each real-time and derived information (Nebula)

Whereas we constructed a multi-tenant key-value retailer that supported environment friendly bulk load and low latency learn, it had its drawbacks. For instance, it didn’t assist level, low-latency writes, and any replace to the saved information needed to undergo the day by day bulk load job. As Airbnb grew, there was an elevated have to have low latency entry to real-time information.

Subsequently, Nebula was constructed to assist each batch-update and real-time information in a single system. It internally used DynamoDB to retailer real-time information and S3/HFile to retailer batch-update information. Nebula launched timestamp primarily based versioning as a model management mechanism. For learn requests, information could be learn from each a listing of dynamic tables and the static snapshot in HFileService, and the end result merged primarily based on timestamp.

To reduce on-line merge operations, Nebula additionally had scheduled spark jobs that ran day by day and merged snapshots of DynamoDB information with the static snapshot of HFileService. Zookeeper was used to coordinate write availability of dynamic tables, snapshots being marked prepared for learn, and dropping of stale tables.

Fig. 2: Nebula Structure

Stage 3 (2018): Scalable and low latency key-value storage engine (Mussel)

In Stage 3, we constructed a system that supported each learn and write on real-time and batch-update information with timestamp-based battle decision. Nevertheless, there have been alternatives for enchancment:

  1. Scale-out problem: It was cumbersome to manually edit partition mappings inside Zookeeper with rising information progress, or to horizontally scale the system for rising site visitors by including further nodes
  2. Enhance learn efficiency beneath spiky write site visitors
  3. Excessive upkeep overhead: We would have liked to keep up HFileService and DynamoDB on the identical time
  4. Inefficient merging course of: The method of merging the delta replace from DynamoDB and HFileService day by day turned very sluggish as our complete information dimension turned bigger. The day by day replace information in DynamoDB was simply 1–2% of the baseline information in HFileService. Nevertheless, we re-published the complete snapshot (102% of complete information dimension) again to HFileService day by day

To unravel the drawbacks, we got here up with a brand new key-value retailer system known as Mussel.

  1. We launched Helix to handle the partition mapping throughout the cluster
  2. We leveraged Kafka as a replication log to copy the write to all the replicas as a substitute of writing on to the Mussel retailer
  3. We used HRegion as the one storage engine within the Mussel storage nodes
  4. We constructed a Spark pipeline to load the information from the information warehouse into storage nodes immediately

Let’s go into extra particulars within the following paragraphs.

Fig. 3: Mussel Structure

Handle partitions with Helix

In Mussel, as a way to make our cluster extra scalable, we elevated the variety of shards from 8 in HFileService to 1024. In Mussel, information is partitioned into these shards by the hash of the first keys, so we launched Apache Helix to handle these many logical shards. Helix manages the mapping of logical shards to bodily storage nodes robotically. Every Mussel storage node might maintain a number of logical shards. Every logical shard is replicated throughout a number of Mussel storage nodes.

Leaderless Replication with Kafka

Since Mussel is a read-heavy retailer, we adopted a leaderless structure. Learn requests could possibly be served by any of the Mussel storage nodes which have the identical logical shard, which will increase learn scalability. Within the write path, we would have liked to contemplate the next:

  1. We wish to clean the write site visitors to keep away from the influence on the learn path
  2. Since we don’t have the chief node in every shard, we’d like a means to ensure every Mussel storage node applies the write requests in the identical order so the information is constant throughout totally different nodes

To unravel these issues, we launched Kafka as a write-ahead-log right here. For write requests, as a substitute of immediately writing to the Mussel storage node, it’ll first write to Kafka asynchronously. We have now 1024 partitions for the Kafka matter, every partition belonging to 1 logical shard within the Mussel. Every Mussel storage node will ballot the occasions from Kafka and apply the change to its native retailer. Since there is no such thing as a leader-follower relationship between the shards, this configuration permits the proper write ordering inside a partition, guaranteeing constant updates. The disadvantage right here is that it may possibly solely present eventual consistency. Nevertheless, given the derived information use case, it’s an appropriate tradeoff to compromise on consistency within the curiosity of guaranteeing availability and partition tolerance.

Supporting each learn, write, and compaction in a single storage engine

With a view to cut back the {hardware} price and operational load of managing DynamoDB, we determined to take away it and lengthen HFileService as the one storage engine to serve each real-time and offline information. To raised assist each learn and write operations, we used HRegion as a substitute of Hfile. HRegion is a completely purposeful key-value retailer with MemStore and BlockCache. Internally it makes use of a Log Structured Merged (LSM) Tree to retailer the information and helps each learn and write operations.

An HRegion desk comprises column households, that are the logical and bodily grouping of columns. There are column qualifiers within a column household, that are the columns. Column households include columns with time stamped variations. Columns solely exist when they’re inserted, which makes HRegion a sparse database. We mapped our consumer information to HRegion as the next:

With this mapping, for learn queries, we’re capable of assist:

  1. Level question by trying up the information with main key
  2. Prefix/vary question by scanning information on secondary key
  3. Queries for the newest information or information inside a selected time vary, as each real-time and offline information written to Mussel could have a timestamp

As a result of we’ve over 4000 consumer tables in Mussel, every consumer desk is mapped to a column household in HRegion as a substitute of its personal desk to scale back scalability challenges on the metadata administration layer. Additionally, as HRegion is a column-based storage engine, every column household is saved in a separate file to allow them to be learn/written independently.

For write requests, it consumes the write request from Kafka and calls the HRegion put API to write down the information immediately. For every desk, it may possibly additionally assist customizing the max model and TTL (time-to-live).

Once we serve write requests with HRegion, one other factor to contemplate is compaction. Compaction must be run as a way to clear up information that’s deleted or has reached max model or max TTL. Additionally when the MemStore in HRegion reaches a sure dimension, it’s flushed to disk right into a StoreFile. Compaction will merge these information collectively as a way to cut back disk search and enhance learn efficiency. Nevertheless, then again, when compaction is operating, it causes increased cpu and reminiscence utilization and blocks writes to forestall JVM (Java Digital Machine) heap exhaustion, which impacts the learn and write efficiency of the cluster.

Right here we use Helix to mark Mussel storage nodes for every logical shard into two forms of sources: on-line nodes and batch nodes. For instance, if we’ve 9 Mussel storage nodes for one logical shard, 6 of them are on-line nodes and three of them are batch nodes. The connection between on-line and batch are:

  1. They each serve write requests
  2. Solely on-line nodes serve learn requests and we charge restrict the compaction on on-line nodes to have good learn efficiency
  3. Helix schedules a day by day rotation between on-line nodes and batch nodes. Within the instance above, it strikes 3 on-line nodes to batch and three batch nodes to on-line so these 3 new batch nodes can carry out full velocity main compaction to scrub up outdated information

With this transformation, now we’re capable of assist each learn and write with a single storage engine.

Supporting bulk load from information warehouse

We assist two forms of bulk load pipelines from information warehouse to Mussel by way of Airflow jobs: merge sort and exchange sort. Merge sort means merging the information from the information warehouse and the information from earlier write with older timestamps in Mussel. Exchange means importing the information from the information warehouse and deleting all the information with earlier timestamps.

We make the most of Spark to rework information from the information warehouse into HFile format and add to S3. Every Mussel storage node downloads the information and makes use of HRegion bulkLoadHFiles API to load these HFiles into the column household.

With this bulk load pipeline, we are able to simply load the delta information into the cluster as a substitute of the complete information snapshot each day. Earlier than the migration, the consumer profile service wanted to load about 4TB information into the cluster day by day. After, it solely must load about 40–80GB, drastically decreasing the associated fee and enhancing the efficiency of the cluster.

In the previous couple of years, Airbnb has come a good distance in offering a high-quality derived information retailer for our engineers. The newest key-value retailer Mussel is extensively used inside Airbnb and has grow to be a foundational constructing block for any key-value primarily based utility with sturdy reliability, availability, scalability, and efficiency ensures. Since its introduction, there have been ~4000 tables created in Mussel, storing ~130TB information in our manufacturing clusters with out replication. Mussel has been working reliably to serve giant quantities of learn, write, and bulk load requests: For instance, mussel-general, our largest cluster, has achieved >99.9% availability, common learn QPS > 800k and write QPS > 35k, with common P95 learn latency lower than 8ms.

Although Mussel can serve our present use circumstances effectively, there are nonetheless many alternatives to enhance. For instance, we’re trying ahead to offering the read-after-write consistency to our prospects. We additionally wish to allow auto-scale and repartition primarily based on the site visitors within the cluster. We’re trying ahead to sharing extra particulars about this quickly.

Mussel is a collaborative effort of Airbnb’s storage crew together with: Calvin Zou, Dionitas Santos, Ruan Maia, Wonhee Cho, Xiaomou Wang, Yanhan Zhang.

Focused on engaged on the Airbnb Storage crew? Try this position: Staff Software Engineer, Distributed Storage