Customized Data Movement with Tungsten Replicator Pipelines

Have you ever run into a problem where MySQL replication did 95% of what you needed but not the remaining 5% to solve a real problem?  Hacking the binlog is always a possibility, but it typically looks like this example.  Not a pretty sight.  Wouldn't it be easier if replication were a bunch of building blocks you could recombine to create custom replicator processes? 

Tungsten 1.3 has a new feature called pipelines that allows you to do exactly that.  A pipeline consists of one or more stages that tie together generic components to extract, filter, store, and apply events, which is Tungsten parlance for transactions.  Each stage has a processing thread, so multi-stage pipelines can process data independently and without blocking.  The stages also take care of important but tedious issues like remembering the transactional state of each stage so Tungsten can restart without forgetting events or applying them twice.

Here is a picture of how a pipeline is put together.


When Tungsten Replicator starts it loads a pipeline corresponding to its role, for example master or slave.   The preceding picture shows a slave pipeline consisting of two stages.  The first stage pulls replicated events over the network from a master Tungsten Replicator and stores them in a local transaction history log, which we call the THL.  The second stage extracts the stored events and applies them to the database.   This pipeline is analogous to the I/O and SQL threads on a MySQL slave.

Where Tungsten departs from MySQL and most other replicators in a big way is that pipelines, hence the replication flows, are completely configurable.   The configuration is stored in file replicator.properties.  Here are the property settings to create the slave pipeline.  Note how the role is the name of a pipeline.  This determines which pipeline to run when the replicator goes online.

# Replicator role. 
replicator.role=slave

...
# Generic pipelines. replicator.pipelines=master,slave,direct 

...
# Slave pipeline has two stages:  extract from remote THL to local THL;
# extract from local THL and apply to DBMS.
replicator.pipeline.slave=remote-to-thl,thl-to-dbms
replicator.pipeline.slave.stores=thl
replicator.pipeline.slave.syncTHLWithExtractor=false

replicator.stage.remote-to-thl=com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.remote-to-thl.extractor=thl-remote
replicator.stage.remote-to-thl.applier=thl-local

replicator.stage.thl-to-dbms=com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.thl-to-dbms.extractor=thl-local
replicator.stage.thl-to-dbms.applier=mysql
replicator.stage.thl-to-dbms.filters=mysqlsessions

The syntax is not beautiful but it is quite flexible.  Here is what this definition means.
  1. This replicator knows about three pipelines named master, slave, and direct
  2. The slave pipeline has two stages called remote-to-thl and thl-to-dbms and a store called thl.  It has a property named syncTHLWithExtractor which must be set to false for slaves.  (We need to change that name to something like 'isMaster'.) 
  3. The remote-to-thl stage extracts from thl-remote.  This extractor reads events over the network from a remote replicator.  The stage apples to thl-local, which is an applier that writes events to the local transaction history log. 
  4. The thl-to-dbms stage pulls events from the local log and applies them to the database.  Note that in addition to an applier and extractor, there is also a filter named mysqlsessions.  This filter looks at events and modifies them to generate a pseudo-session ID, which is necessary to avoid problems with temporary tables when applying transactions from multiple sessions.  It is just one of a number of filters that Tungsten provides.
Components like appliers, filters, extractors, and stores have individual configuration elsewhere in the tungsten.properties file.  Here's an example of configuration for a MySQL binlog extractor.  (Note that Tungsten 1.3 can now read binlogs directly as files or relay them from a master server.) 

# MySQL binlog extractor properties. 
replicator.extractor.mysql=com.continuent.tungsten.replicator.extractor.mysql.MySQLExtractor
replicator.extractor.mysql.binlog_dir=/var/log/mysql
replicator.extractor.mysql.binlog_file_pattern=mysql-bin
replicator.extractor.mysql.host=logos1-u1
replicator.extractor.mysql.port=3306
replicator.extractor.mysql.user=${replicator.global.db.user}
replicator.extractor.mysql.password=${replicator.global.db.password}
replicator.extractor.mysql.parseStatements=true

# When using relay logs we download from the master into binlog_dir.  This
# is used for off-board replication.
#replicator.extractor.mysql.useRelayLogs=false

The thing that makes pipelines really flexible is that the interfaces are completely symmetric.  Components to extract events from MySQL binlog or from a transaction history log have identical APIs.  Similarly, the APIs to apply events are the same whether storing events in a log or applying to a slave.  Pipelines can tie together practically any sequence of extract, filter, and apply operations you can think of. 

Here are diagrams of a couple of useful single-stage pipelines. 


The "dummy" pipeline reads events directly from MySQL binlogs and just throws them away.  This sounds useless but in fact it is rather convenient.  You can use the dummy pipeline check whether your binlogs are good.  If you add filters you can also use a dummy pipeline to report on what is in the binlog.  Finally, you can use it as a quick and non-intrusive check to see if Tungsten can handle the data in your binlog--a nice way to ensure you can migrate smoothly. 

Here's the dummy pipeline definition:

# Generic pipelines. 
replicator.pipelines=master,slave,direct, dummy
...
# Dummy pipeline has single stage that writes from binlog to bit-bucket.
replicator.pipeline.dummy=binlog-to-dummy
replicator.pipeline.dummy.autoSync=true

replicator.stage.binlog-to-dummy=com.continuent.tungsten.replicator.pipeline.SingleThreadStageTask
replicator.stage.binlog-to-dummy.extractor=mysql
replicator.stage.binlog-to-slave.applier=dummy

The "direct" pipeline fetches events directly from a master MySQL server using client log requests over the network and applies them immediately to a slave.  I use this pipeline to test master-to-slave performance, but it's also very handy for transferring a set of SQL updates from the binlog of any master to any slave on the network.  For instance, you can transfer upgrade commands very efficiently out of the binlog of a successfully upgraded MySQL server to other servers on the network.  You can also use it to "rescue" transactions that are stuck in the binlog of a failed master.  That is starting to be genuinely useful. 

The definition of the direct pipeline is already in the default replicator.properties.mysql template that comes with Tungsten 1.3, so it is not necessary to repeat it here.  You can just download the software (open source version is here) and have a look at it yourself.  There's almost more documentation than people can bear--look here to find a full set.  Version 1.3 docs will be posted shortly on the website and are already available for commercial customers.   As usual you can also view the source code on SourceForge.net. 

Pipelines belong to a set of major feature improvements to Tungsten to support SaaS and large enterprise deployments.  Some of the other features include fast event logging directly to disk (no more posting events in InnoDB), low-latency WAN transfer, multi-master replication support, and parallel replication.  Stay tuned!