F1: A Distributed SQL Database That Scales - Google, 2013
F1 is a SQL database built on top of Spanner.
- Originally built to replace Google’s MySQL cluster used for AdWords
- My friend Jason Lucas of OrlyAtomics said it best, “NoSQL: Whom Shall We Screw?”
- If we have a distributed database, we often choose to relax consistency making things very difficult for the software engineers
- There’s enough to deal with ensure business logic is correct for AdWords then to have engineers write logic to deal with eventual consistency
- While F1 does increase the latency for many reads and writes, they’ve done a lot of work to alleviate the issues including an ORM that tries to guide engineers to write good client code
- Architecture
- Spanner on top of Colossus provides the KV store with distributed transactions
- F1 servers are “mostly stateless”
- Generally placed in the same DC as Spanner servers
- Client can usually communicate with any F1 server
- During a pessimistic transaction (i.e. the client holds locks) it must stay in communication with the same server
- F1 Slave Pool
- For execution of parts of distributed queries
- Shared
- Since F1 servers do not store data, adding and removing nodes does not require re-balancing of data
- Interesting idea that allows you to scale computation separately from storage or access
- Spanner
- Directory, a ‘database’ name in the conventional sense
- Each directory is made up of several fragments
- A collections of fragments in a directory is called a group
- Groups are replicated across DCs.
- Groups use Paxos
- One replica is deemed the Paxos leader for the group
- Some groups have readonly replicas that do not vote
- Pessimistic transactions use 2PL
- Transactions within a group do not require 2PC (only one leader)
- Transactions across groups require 2PC (on top of Paxos) and are slower
- Does not work well with 100s of participants
- Spanner allows for reading a snapshot of the data without locks
- Spanner has a grantee that there are no current or future transactions that will commit before theglobal safe timestamp
- Typically this timestamp is 5-10 seconds behind
- Data Model
- Schema
- F1 has a hierarchical schema
- Logical child tables can be interleaved with a physical parent table
- The child table must have a FK to it’s parent as part of it’s PK
- A row in the physical table is deemed the root row
- All data associated with the row in the parent and child tables are clustered together (generally the same Spanner server)
- This means that joins between a parent and child table can be satisfied with fast range queries on the same server
- Updates tend to be only on one or few servers requiring little coordination
- Protocol Buffers are first class citizens and are exposed in the SQL
- Indexes
- Transactional and fully consistent
- Stored in separate tables in Spanner
- I like this idea, reuse all the infra we already have!
- Keys comprise of the index key and the PK of the index table
- Index keys can be extracted from Protos
- Local indexes have the root row’s PK as part of their prefix
- Local indexes can be stored as child tables
- Global indexes are not keyed by a root row (for example an index on
Keyword
)- Usually large with high update rates
- Stored on multiple spanner servers
- To update a single row in the index required only adding one participant to 2PC
- However, I note that if this is a search index and you are adding a document with N words you could see that the number of participants count increase quite a bit
- The authors encourage users to use small transactions when updating data in a global index
- Schema Changes
- This seems like a heroic effort to me
- In AdWords, many schema changes happen each day!
- They can’t just lock a table
- Their scheme is non-blocking
- Their algorithm is summarized as follows:
- Only two schemas are active in the cluster at a time
- Each server is using either the previous or new schema
- Leases are used to enforce this
- Each schema change is divided into a series of phases where each consecutive pair is compatible
- They may use MapReduce to backfill index entries for a new index
- This index may be invisible until the index is current
- Schema
- Transactions
- An F1 transaction us a set of reads followed by an optional single write
- Types:
- Snapshot Transactions
- Read only
- Uses a fixed timestamp
- Reads are repeatable
- By default it uses the global safe timestamp
- Can use a client specified timestamp
- This could require many RPCs
- This is the default type
- Pessimistic Transactions
- These are Spanner transactions
- Acquires locks (2PL)
- Locks can either be shared or exclusive
- Optimistic Transactions
- Read phase followed by a write phase
- Read phase holds no locks
- Read phase can take arbitrarily long
- In the read phase, F1 returns the last modified timestamp with each row
- The timestamp for a row is stored in a hidden lock column
- The client library returns these timestamps to the F1 server
- If the timestamps differ from the current timestamps at the time of commit the transaction is aborted
- Benefits
- Bad clients don’t hold any locks so they don’t hurt the rest of the system
- They can be easily retried on the F1 server to hide transient errors
- Transaction state is kept on the client, therefore the client can failover to another F1 server if it chooses
- Clients can read values outside of a transaction and then write at some later time (great for MapReduce)
- Drawbacks
- Insertion Phantoms
- Insertions can happen without being noticed because there is no timestamp at read time that can be passed to the client
- They use parent-table locks to avoid phantoms
- Like many optimistic locking strategies, throughput goes down if there is high contention
- Insertion Phantoms
- Locking Granularity
- Row level locking by default
- Additional lock columns can be added in the schema to cover a subset of the columns in the row
- Snapshot Transactions
- Change History
- While the Java libraries for accessing data in the AdWords MySQL cluster added change logs, Python scripts and other clients didn’t always do proper change logs
- Change history is a “first-class feature”
- Each F1 transaction creates one or more ChangeBatch protos that contain the PK and before and after values of each column
- These records are added as children of their root table
- Thus their are close to the data being tracked requiring no extra participants in the transaction
- The records have pointers to eachother if multiple rows were included in the same transaction
- F1 publishes the change history for others to subscribe to
- The authors discuss a cool use of this feature, a cache
- An in-memory cache includes the timestamp of the data it caches
- When a user commits data to F1 and subsequently reads from the cache (perhaps to show the data on the next page), it can request the data from the cache with the commit timestamp
- If the cache is out of data, it can read the change history to catch up
- Less intensive then a full refresh and is a simple cache invalidation method
- Client Design
- ORM
- I believe this section illustrates that thinking hard about good abstractions can really improve performance
- In a way typical of many published Google projects, the authors limit the API to provided better guarantees
- NoSQL Interface
- ORM uses the NoSQL interface under the hood
- Clients can use it directly
- Can sometimes be easier to read and write data
- SQL Interface
- An extended SQL
- ORM
- Query Processing
- Can either be centrally excited for low-latency or distributed for high parallelism
- To maximize pipelining, query plan operators stream results to the next operators as soon as possible
- Distributed Queries
- Always uses snapshot transactions
- Good for OLAP style
- Tasks are spread across the slave pool
- All the data is remote (on Spanner/Colossus)
- Network latency is mitigated by pipelining and batching
- In this case, since there are many discs involved in the read, it’s fine to ignore conventional DBMS wisdom and issues many data accesses at the same time
- They see near-linear speedup with this approach (until the storage is overloaded)
- Their lookup join requests 50 MB of data and then looks up all the keys in the inner table simultaneously!
- WOAH!
- Their network fabric is such that servers on different racks can communicate at full link speed
- Hash Join Partitioning
- Repartitions both inputs with a hash function on the join key
- Each worker deals with a shard of the data
- The query planner determines the smallest input and each shard loads it into an in-memory hash table
- Distributed Aggregation
- Buffer as much as possible in small buffers and then repartition the data by group key for final aggregation
- There is no checkpointing which means that if any component fails the entire query could fail
- Partitioned Consumers
- The client might not be able to receive data at the rate F1 can produce it
- Multiple clients can consume the result stream in parallel
- Often used in MapReduce jobs
- If a client doesn’t keep up, this can cause the F1 slaves to block, slowing other transactions
- Eventually they are thinking of implementing a disk based buffer
- You can join on fields in a Protocol Buffer as part of the SQL
- Misc
- MapReduce clients are allowed to communicate directly with Spanner for performance
- Read only replicas allow for OLAP load to be on separate servers
- 3-way replication is not enough
- If one replica is offline, F1 can not handle another failure as Paxos requires a majority of replicas
- In practice they use 5 replicas (two east coast, two west coast and one centrally)
No comments:
Post a Comment