Introducing Tungsten On-Disk Queues for Parallel Replication

Tungsten Replicator has offered shard-based parallel replication to slaves since late 2010.  The initial implementation uses in-memory queues.  Working purely in memory keeps latency low and throughput high.   On the other hand, working in memory consumes valuable RAM.  It also forces us to buffer all in-flight transactions and therefore greatly limits the span of time permissible between the slowest and fastest shard.

Hence our newest improvement:  on-disk parallel queues.  In this article I will cover how parallel replication works in general, how on-disk queues help with parallel replication, and then show how to set up from the latest builds.

First, let's review the basic mechanics.  Parallel replication is really "parallel apply," which means taking a stream of serialized transactions, splitting them into separate streams, and applying them in parallel to a slave DBMS.  In the Tungsten pipeline architecture, we implement this kind of flow using a combination of stages and stores.  One stage reads transactions from the persistent transaction history log (aka THL) to a "parallel store."  The parallel store splits the stream into a set of queues.  The next stage extracts from those queues and applies to the slave.   It looks like the following picture:

From a conceptual point of view the incoming thl-to-q task thread performs an indexing function.  It  guides construction of the queues read by task threads in the q-to-dbms stage.  Within this framework there are many ways to feed events into the parallel queues.    In the case of on-disk queues there are two obvious design options.  
  1. Read data out of the THL and split them into separate transaction logs per parallel queue.  This is very similar to the in-memory approach, except that the queues are now on disk (or SSD or whatever storage you pick).  It can be implemented without adding any extra threads to the parallel store. 
  2. Leave all data in THL.  Add a cursor for each parallel queue that scans the THL and picks only the transactions that belong in that parallel queue.   This requires extra threads to do the scans, hence is more complex to implement.  
Both approaches achieve the primary goal, which is to keep the transactions in storage until we actually need them and thereby minimize memory usage.  This in turn solves a major problem, namely that individual shards can now be many thousands or even millions of transactions apart in the serial history.  Beyond that, it is not completely obvious which approach is better.

For example, option 1 isolates the reads to individual files.  This minimizes overall I/O at the cost of making it more random, since reads and writes are spread over many files.  Option 2 avoids extra writes and keeps I/O sequential, but introduces a bunch of threads doing the equivalent of table scans across the same patch of storage.  Up to a point we can assume that pages are coming out of the OS page cache rather than storage but this assumption will not hold for all operating environments and workloads.  The only way to prove the trade-offs is to implement and test.  (We may end up implementing both.)

After some discussion internally at Continuent as well as with Harrison Fisk from Facebook, we picked option 2 for the initial implementation.  Here is a diagram that shows how it works.


Here is a quick tour of the implementation in Java class THLParallelQueue.  This class maintains an in-memory blocking queue for each channel.  Each queue has a corresponding read thread that scans the THL and places matching events into the queue.  The THLParallelQueue class synchronizes read threads and handles issues like serialization and clean shutdown.  Some memory is therefore consumed, for queues, but they are quite small and amount to far less than keeping all transactions in memory.

So much for the theoretical description. If you would like to test on-disk queues yourself, you can get started in three steps.
  1. Download the latest nightly build of Tungsten Replicator to /tmp. 
  2. Untar and cd into the resulting release directory.  
  3. Install using the new tungsten-installer as described by Guiseppe Maxia.  
Here is an example of set-up commands.  My test system uses logos1 as the master and logos2 as the slave.  The MySQL DBMS build is Percona Server 5.1.54.  Tungsten is installed in /opt/tungsten.

# Download and unpack build.
cd /tmp
wget --no-check-certificate https://s3.amazonaws.com/files.continuent.com/builds/nightly/tungsten-2.0-snapshots/tungsten-replicator-2.0.4-154.tar.gz
# Untar.
tar -xvzf tungsten-replicator-2.0.4-154.tar.gz
cd tungsten-replicator-2.0.4-154
# Set up and start replicators. 
export TUNGSTEN_HOME=/opt/tungsten
/tmp/tungsten-replicator-2.0.4-154/tools/tungsten-installer \
  --master-slave  \
  --master-host=logos1  \
  --datasource-user=tungsten  \
  --datasource-password=secret  \
  --service-name=percona \
  --home-directory=${TUNGSTEN_HOME} \
  --cluster-hosts=logos1,logos2 \
  --relay-directory=${TUNGSTEN_HOME}/relay-logs \
  --datasource-log-directory=/usr/local/percona-5.1.54/data \
  --thl-directory=${TUNGSTEN_HOME}/thl-logs \
  --channels=10 \
  --svc-parallelization-type=disk \
  --start-and-report                                  

Note the bold options to select disk queues--"memory" is the other option--and the number of channels.  You can confirm you have the right queue installed by running the following command against any slave replicator.  You should see the storage class THLParallelQueue in the status output.

$ trepctl -host logos2 status -name stores
Processing status command (stores)...
NAME                VALUE
----                -----
doChecksum        : false
logDir            : /opt/rhodges4/thl-logs/percona
logFileRetention  : 7d
logFileSize       : 100000000
maximumStoredSeqNo: 0
minimumStoredSeqNo: 0
name              : thl
storeClass        : com.continuent.tungsten.replicator.thl.THL
NAME                VALUE
----                -----
criticalPartition : -1
discardCount      : 0
eventCount        : 1
headSeqno         : 0
maxSize           : 10
name              : parallel-queue
queues            : 10
serializationCount: 0
serialized        : false
stopRequested     : false
store.0           : THLParallelReadTask task_id=0 thread_name=store-thl-0 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.1           : THLParallelReadTask task_id=1 thread_name=store-thl-1 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.2           : THLParallelReadTask task_id=2 thread_name=store-thl-2 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.3           : THLParallelReadTask task_id=3 thread_name=store-thl-3 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.4           : THLParallelReadTask task_id=4 thread_name=store-thl-4 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.5           : THLParallelReadTask task_id=5 thread_name=store-thl-5 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.6           : THLParallelReadTask task_id=6 thread_name=store-thl-6 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.7           : THLParallelReadTask task_id=7 thread_name=store-thl-7 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.8           : THLParallelReadTask task_id=8 thread_name=store-thl-8 hi_seqno=0 lo_seqno=0 read=1 discarded=1 events=0
store.9           : THLParallelReadTask task_id=9 thread_name=store-thl-9 hi_seqno=0 lo_seqno=0 read=1 discarded=0 events=0
storeClass        : com.continuent.tungsten.replicator.thl.THLParallelQueue
syncEnabled       : true
syncInterval      : 2000
Finished status command (stores)...


Now for some fine print.  On-disk queues are implemented but are still undergoing QA.  There are bugs.  The most important problem is performance--the latency is a lot higher than expected on some of our systems, which I suspect is due to an as-yet undiagnosed bug.  If you try them out now you can expect to hit a few problems.  On the other hand, we take any and all feedback quite seriously, so this is your chance provide input and help guide the final implementation.  Please log issues on the Tungsten Replicator issue tracker or bring up questions on the tungsten-discuss mailing list.

Finally, if you would like to learn more about the parallel queue implementation, check out the design documentation on our wiki as well as the source code.  They are both pretty readable.