The first way to get good performance with Tungsten is to have the right workload. As explained in an earlier article on this blog, Tungsten parallel replication works by replicating independent databases (aka shards) in parallel. Here is a picture that summarizes what is going on.
If you have a lot of schemas, if the updates are distributed evenly across schemas, and if you don't have many dependencies between schemas that require full serialization, parallel replication can speed things up significantly for I/O-bound workloads. For example, Tungsten runs three times faster than MySQL native replication on large datasets when the slave is catching up to the master following mysqld restart.
Catch-up is a famous slave lag case and one where Tungsten can be quite helpful. (I think we will be faster in the future, but this is a good start.) Nevertheless, there's a chance you'll need to do a bit of tuning to see such benefits yourself.
Tungsten currently uses a structure called a parallel queue to enable parallelization. The parallel queue typically sits at the end of a replicator pipeline in front of the parallel apply threads, as shown in the following handy diagram.
One key to getting decent parallel replication performance is to watch the parallel queue in operation. In Tungsten Replicator 2.0.1 we introduced a new status command
trepctl status -name stores
that goes a long way to help diagnose how well parallel replication is performing. Here's a typical example using a 6 channel queue store. $ trepctl status -name stores
Processing status command (stores)...
NAME VALUE
---- -----
criticalPartition : -1
discardCount : 0
eventCount : 3217
maxSize : 1000
name : parallel-queue
queues : 6
serializationCount: 1
serialized : false
stopRequested : false
store.queueSize.0 : 0
store.queueSize.1 : 480
store.queueSize.2 : 310
store.queueSize.3 : 1000
store.queueSize.4 : 739
store.queueSize.5 : 407
storeClass : com.continuent.tungsten.enterprise.replicator.store.ParallelQueueStore
storeSize : 2936
syncEnabled : true
syncInterval : 100
Finished status command (stores)...
The two most important things to look at are distribution of transactions across queues and serialization. Let's start with transaction distribution. In this particular example we were running a parallel queue with 6 channels but only 5 databases. The distribution therefore looks pretty good. One queue is empty but the other have a fairly even distribution of transactions.
Notice that one queue has exactly 1000 transactions. In Tungsten Replicator 2.0.1 the parallel queue has a maximum size parameter (maxSize), which is set to 1000 for this example run. Once an individual queue hits the maxSize limit, the entire parallel queue blocks. It is not uncommon to see one queue blocking in this way if the replicator is catching up, which is exactly what is happening here. In fact, if the queues are all empty it is possible Tungsten is somehow not supplying transactions to the queue fast enough. That is not a problem here.
Bad workloads on the other hand tend to have a lot of transactions in one or two queues and few or none in all the rest. The following is an example of a possibly bad distribution.
$ trepctl status -name stores
Processing status command (stores)...
NAME VALUE
---- -----
...
store.queueSize.0 : 0
store.queueSize.1 : 4
store.queueSize.2 : 3
store.queueSize.3 : 972
store.queueSize.4 : 0
store.queueSize.5 : 15
...
Finished status command (stores)...
If you see such skewed distributions persistently, you may want to try to adjust the queue partitioning using the
shard.list
file. The default parallel queue partitioning algorithm hashes shards into channels. This does not always gives optimal performance if your shards mostly happen to hash into the same channel. The other possibility is that the workload is just badly distributed across databases.You can decide whether the workload or partitioning is at fault using the
trepctl status -name shards
command. Here's an example.
$ ./trepctl status -name shards
Processing status command (shards)...
NAME VALUE
---- -----
appliedLastEventId: 000007:0000000000000384;20
appliedLastSeqno : 1471201
appliedLatency : 0.0
eventCount : 6
shardId : #UNKNOWN
stage : d-pq-to-dbms
NAME VALUE
---- -----
appliedLastEventId: 000005:0000000326365895;41
appliedLastSeqno : 1470999
appliedLatency : 0.0
eventCount : 311605
shardId : db1
stage : d-pq-to-dbms
NAME VALUE
---- -----
appliedLastEventId: 000005:0000000326512277;95
appliedLastSeqno : 1471200
appliedLatency : 0.0
eventCount : 298522
shardId : db2
stage : d-pq-to-dbms
...
This shows that the distribution of transactions between the db1 and db2 databases is pretty even. If you have many databases with roughly even values in the eventCount parameter, the workload is well suited for parallelization. In that case you may want to assign shards explicitly in the shard.list file if you don't like the distribution in the parallel queue.
Meanwhile, the previous example shows an example of another potential problem. We also see counts for #UNKNOWN, which is a special shard ID that means "I could not tell what schema this is." #UNKOWN transactions can occur if Tungsten cannot parse a SQL statement properly or there is a transaction that updates multiple schemas. In either case, Tungsten serializes the parallel queue.
However it occurs, serialization is a performance killer because it means we have to block the parallel queue until all parallel transactions complete, execute one or more transactions serially, and then reopen the parallel queue. You can see how often this is happening from the serializationCount value on trepctl status -name stores. For many workloads a serializationCount value that is more than a few percent of the number in eventCount means the entire transaction stream is effectively serialized.
If the serialization is occurring due to #UNKNOWN shards, you may be able to improve things using a new value in the replicator service properties file that was added in version 2.0.1. It controls whether we assign the shard ID using the default schema even if Tungsten cannot tell from the SQL command what you are doing.
# Policy for shard assignment based on default database. If 'stringent', use
# default database only if SQL is recognized. For 'relaxed' always use the
# default database if it is available.
replicator.shard.default.db=relaxed
Setting the parameter to
relaxed
can help quite a bit if the problem is due to unusual SQL that confuses the parser. On one workload we were able to reduce the serializationCount from about 10% of transactions to 0% in this way. We then saw the expected speed-up from parallel replication.