ACCUMULO-4451 Fix line-endings * Use UNIX line endings * Remove BOM in unicode text file
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/200be5f2 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/200be5f2 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/200be5f2 Branch: refs/heads/master Commit: 200be5f29a8150b9cf2dd2042cfc1d79345ac23e Parents: 2be9231 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Fri Sep 9 15:13:53 2016 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Fri Sep 9 15:13:53 2016 -0400 ---------------------------------------------------------------------- .../resources/design/ACCUMULO-378-design.mdtext | 936 +++++++++---------- 1 file changed, 468 insertions(+), 468 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/200be5f2/docs/src/main/resources/design/ACCUMULO-378-design.mdtext ---------------------------------------------------------------------- diff --git a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext index 7905840..1521110 100644 --- a/docs/src/main/resources/design/ACCUMULO-378-design.mdtext +++ b/docs/src/main/resources/design/ACCUMULO-378-design.mdtext @@ -1,468 +1,468 @@ -Licensed to the Apache Software Foundation (ASF) under one or more -contributor license agreements. See the NOTICE file distributed with -this work for additional information regarding copyright ownership. -The ASF licenses this file to You under the Apache License, Version 2.0 -(the "License"); you may not use this file except in compliance with -the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -Accumulo Multi-DataCenter Replication -===================================== - -ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is -written to one Accumulo instance will automatically be replicated to a separate Accumulo instance. - - -Justification -------------- - -Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the -potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the -redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is -always a varying amount of required administrative intervention to ensure that failure does not result in data loss: -userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), âexpectedâ -hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures -(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances; -however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication. - - -Background ----------- - -Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This -satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which -both replicate to each other. Both systems can service new writes and will update their âviewâ of a table from one -another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the -master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data -from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of -this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic -region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to -multiple peers instead of just one. - - -While these are relatively different to one another, I believe most can be satisfied through a single, master-push, - replication implementation. Although, the proposed data structure should also be capable of supporting a - peer-pull strategy. - - -Implementation --------------- - -As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant -us the most flexibility and the most functionality. The general implementation should be capable of application to the -other replication structures (master-master and cyclic-replication). Iâll outline a simple master-peer replication use -case, followed by application of this approach to replication cycles and master-master replication. This approach does -not consider conditional mutations. - - -### Replication Framework - -In an attempt to be as clear as possible, Iâll use the following terminology when explaining the implementation: master -will refer to the âmasterâ Accumulo cluster (the system accepting new writes), peer will refer to the âpeerâ Accumulo -cluster (the system which does not receive new data through the Accumulo client API, but only from master through - replication). The design results in an eventual consistency model of replication which will allow for peers to -be offline and the online master to still process new updates. - - -In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the -peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead -log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is -being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map -provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new -updates at the RFile level. As such, this proposed implementation uses the WAL as the transport âfile formatâ[a]. While -it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to -apply whereas they could likely be transformed once, that is left as a future optimization. - - -To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication -process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each -target server which amortizes the cost into a single write set message. It is not apparent that this requires -co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors -caused by placing this data in the metadata table is entirely removed. - - -In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is -desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems -participating. - - -These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate -some data when that request already contains the current clusterâs identifier. In other words, data we try to replicate -will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the -determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also -lets us treat replication rules as a graph which grants us a common terminology to use when describing replication. - - -This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo -cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation -should not prohibit alternative approaches to replication such as other databases or filesystems. - - -### Replication Strategy Implementation - - -Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk -references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice -property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC -costs. - - -#### Make the master aware of file to replicate - - -Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let -master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to -create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we -also want to write an entry for the REPL column[b]. In both cases, the chunkâs URI will be stored in the column -qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset -values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector -needs to be modified to not delete these files on the masterâs HDFS instance until these files are replicated (copied to - the peer). - - -#### Choose local TabletServer to perform replication - - -The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds -some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage -the state machine of this replication process. The expected principles, such as exponential backoff on network errors, - should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL - column for the given chunk. - - -On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote -file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate -Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column -in the peerâs table to repeat the replication process, adding in its cluster identifier to the provenance list. -Otherwise, the file can be a candidate for deletion by the garbage collection. - - -The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data. -This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local, - then we gain the same locality perks. - - -#### Recurse - - -In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As -aforementioned, it is relatively easy to âscheduleâ replication of a new file on peer because we just repeat the same -process that master did to replicate to peer in the first place. - - -### Master cluster replication âbookkeepingâ - - -This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be -replicated and when is a file safe to be removed. - - -Two key structures are used to implement this bookkeeping: - - -1. Tablet-level entry: tabletId repl:fully-qualified-file [] value - - -2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file -clusterName:remoteTableID [] value - - -These two key structures will be outlined below, with â*repl* columnâ and â*~repl* rowâ denoting which is being referred to. - - -#### Data Structure in Value - - -To avoid the necessity of using conditional mutations or other âtransaction-likeâ operations, we can use a combiner to -generate an aggregate view of replication information. Protobuf is decent choice, however, the description isnât tied to -any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary -functionality: - - - ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been`` - ``// closed and will have no new data to replicate`` - - - ``State:Enum { OPEN, CLOSED }`` - - - ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }`` - - -The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset -value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how -much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less -than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and -replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a -read-only file and the WAL as an append-only file. - - -For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using -this information, we would be able to implement the following methods: - - - ``bool isFullyReplicated(ReplicationStatus)`` - ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)`` - - -The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be -replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there -still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID. - - -rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated, -some the end of a range of data that still needs replication, return a range of data that has -not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been -replicated and keyvalues up to offset 300 are marked as needing replication, this method should -return [101,300]. Ranges of data replicated, and data needing replication must always be -disjoint and contiguous to ensure that data is replayed in the correct order on the peer. - - -The use of a Combiner is used to create a basic notion of âadditionâ and âsubtractionâ. We cannot use deletes to manage -this without creating a custom iterator, which would not be desirable since it would required to run over the entire -accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling âtombstoneâingâ -future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be -replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the -WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the -replication_finished_offset. - - -When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is -would be tracked via the *repl* column. As such, a ReplicationStatus âobjectâ is candidate for deletion when the state is -CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always -propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error. - - -Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The -assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a -combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is -what a tserver will do. - - -1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tabletâs -row to track that this WAL will need to be replicated. - - - INSERT - tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(state=OPEN) - - -2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current -length of data in the WAL that it just wrote that needs to be read for purposes of replication. - - - INSERT - tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(addition - offset) - - -3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused. -This results in updating the state to be CLOSED. - - - INSERT - tablet repl:hdfs://localhost:8020/accumulo/.../wal/⦠-> ReplicationStatus(state=CLOSED) - - -The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row -entries for the file and where it should be replicated to. The high-level goals for this thread are as follows: - - -1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID) - - - INSERT - *~repl*_hdfs://localhost:8020/accumulo/.../wal/⦠clusterName:tableId -> ReplicationStatus(addition - offset) - - -2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the -sign that the GC can then remove this file. - - - DELETE - tablet repl:hdfs://localhost:8020/accumulo/.../wal/⦠- - -This can be accomplished with a single thread that scans the metadata table: - - -1) Construct âsnapshotâ of tablet *repl* file entries with aggregated offsets, sorted by file, - - - [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... }, - hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... }, - hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ] - - -2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join âstateâ from aggregate *repl* column across -tablets, and columns in *~repl* row for the file. - - - for each file in *~repl* rowspace: - if all columns in *~repl*_file row isFullyReplicated: - issue deletes for file in *repl* column for all tablets with references - if delete of *repl* is successful: - delete *~repl* row - else if *~repl* row exists but no *repl* columns: - // Catch failure case from first conditional - delete *~repl* row - else - for each file in âsnapshotâ of *repl* columns: - make mutation for *~repl*_file - for each peer cluster in configuration: - if file should be replicated on peer: - add column for clusterid:remote_tableID -> RS - - -Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the -described procedure without actual replication occurring to aggregate data that needs replication. Configuration - - -Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that -we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult. -Configuration requires new configuration variables that need to be introduced to support the necessary information. Each -peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The -API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or -metadata table. - - -Site-wide: -// The name and location of other clusters -instance.cluster.$name.zookeepers=zk1,zk2,zk3[c] -// The name of this cluster -instance.replication.name=my_cluster_name[d] - -Per-table: -// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to -table.replication.$locality_group_name=cluster1,cluster2,... - - -Shell commands can also be created to make this configuration easier. - - -definecluster cluster_name zookeeper_quorum - - -e.g. definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 - - - - -deletecluster cluster_name zookeeper_quorum - - -e.g. deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 - - - - -enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name - - -e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1 - - - - - - -disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name - - -e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1 - - -For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only. -This likely requires custom configuration and some ZK state to not accept regular API connections. Should be -exposed/controllable by the shell, too. Common Questions - - -*How do conditional mutations work with this approach?* - - -They do not. They will need to throw an Exception. - - -*How does replication work on a table which already contains data?* - - -When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support -this as the existing importtable and exporttable already provide support to do this. - - -*When I update a table property on the master, will it propagate to the peer?* - - -There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that -could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the -other. - - -As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of -nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may -not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica -cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same -age-off rules for both the production and replica. - - -An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be -configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also -difficult in master-master situations as we donât have a shared ZooKeeper instance that we can use to reliably commit -these changes. - - -*What happens in master-master if two Keys are exactly the same with different values?* - - -Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528 - - -*Did you come up with this all on your own?* - - -Ha, no. Big thanks goes out to HBaseâs documentation, Enis Söztutar (HBase), and other Accumulo devs that Iâve bounced -these ideas off of (too many to enumerate). - - - - -Goals -1. Master-Slave configuration that doesnât exclude future master-master work Per locality-group replication configuration -2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for -3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication -4. does not require âclosedâ files (can send incremental updates to peers) Ability to replicate âlive insertsâ and âbulk -5. importsâ Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to -6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle -7. offline peers Implement read-only variant Master/TabletServer[e] - - -Non-Goals -1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities) -2. Wire security between master and peer -3. Support replication of encrypted data[f] -4. Replication of existing data (use importtable & exporttable) -5. Enforce replication of table configuration - - -References - - -* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html -[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and -LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could -contain that? -[b] This approach makes the assumption that we only begin the replication process when a WAL is closed. -This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency. -[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to -force users to configure it correctly? -[d] Same as instance.cluster.$name: Do we need to enforce these values? -[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation -[f] While not in the original scope, it is definitely of great concern. +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +Accumulo Multi-DataCenter Replication +===================================== + +ACCUMULO-378 deals with disaster recovery techniques in Accumulo through cross-site replication of tables. Data which is +written to one Accumulo instance will automatically be replicated to a separate Accumulo instance. + + +Justification +------------- + +Losing an entire instance really stinks. In addition to natural disasters or facility problems, Hadoop always has the +potential for failure. In the newest versions of Hadoop, the high availability (HA) namenode functionality increases the +redundancy of Hadoop in regards to the single point of failure which the namenode previously was. Despite this, there is +always a varying amount of required administrative intervention to ensure that failure does not result in data loss: +userspace software (the entire Hadoop and Java stack), kernel-space software (filesystem implementations), âexpectedâ +hardware failures (hard drives), unexpected compute hardware failures (NICs, CPU, Memory), and infrastructure failures +(switches and routers). Accumulo currently has the ability for manual snapshots/copies across multiple instances; +however, this is not sufficient for multiple reasons with the biggest reason being a lack of automated replication. + + +Background +---------- + +Apache HBase has had master-master replication, cyclic replication and multi-peer replication since 0.92. This +satisfies a wide range of cross-site replication strategies. Master-master replication lets us have two systems which +both replicate to each other. Both systems can service new writes and will update their âviewâ of a table from one +another. Cyclic replication allows us to have cycles in our replication graph. This is a generalization of the +master-master strategy in which we may have ultimately have a system which replicates to a system that it receives data +from. A system with three masters, A, B and C, which replicate in a row (A to B, B to C and C to A) is an example of +this. More complicated examples of this can be envisioned when dealing with multiple replicas inside one geographic +region or data center. Multi-peer replication is a relatively simple in that a single master system will replicate to +multiple peers instead of just one. + + +While these are relatively different to one another, I believe most can be satisfied through a single, master-push, + replication implementation. Although, the proposed data structure should also be capable of supporting a + peer-pull strategy. + + +Implementation +-------------- + +As a first implementation, I will prototype a single master with multiple peer replication strategy. This should grant +us the most flexibility and the most functionality. The general implementation should be capable of application to the +other replication structures (master-master and cyclic-replication). Iâll outline a simple master-peer replication use +case, followed by application of this approach to replication cycles and master-master replication. This approach does +not consider conditional mutations. + + +### Replication Framework + +In an attempt to be as clear as possible, Iâll use the following terminology when explaining the implementation: master +will refer to the âmasterâ Accumulo cluster (the system accepting new writes), peer will refer to the âpeerâ Accumulo +cluster (the system which does not receive new data through the Accumulo client API, but only from master through + replication). The design results in an eventual consistency model of replication which will allow for peers to +be offline and the online master to still process new updates. + + +In the simplest notion, when a new file is created by master, we want to ensure that this file is also sent to the +peer. In practice, this new file can either be an RFile that was bulk-imported to master or this can be a write-ahead +log (WAL) file. The bulk-imported RFile is the easy case, but the WAL case merits additional explanation. While data is +being written to Accumulo is it written to a sorted, in-memory map and an append-only WAL file. While the in-memory map +provides a very useful interface for the TabletServer to use for scans and compactions, it is difficult to extract new +updates at the RFile level. As such, this proposed implementation uses the WAL as the transport âfile formatâ[a]. While +it is noted that in sending a WAL to multiple peers, each peer will need to reprocess each WAL to make Mutations to +apply whereas they could likely be transformed once, that is left as a future optimization. + + +To increase the speed in eventual consistency can be achieved, WAL offsets can be tracked to begin the replication +process before a WAL is closed. We can bin these mutations together for a lazy replication which can be combined to each +target server which amortizes the cost into a single write set message. It is not apparent that this requires +co-location within each source tablet in the Accumulo metadata table which means that the worry of inadvertent errors +caused by placing this data in the metadata table is entirely removed. + + +In every replication graph, which consists of master(s) and peer(s), each system should have a unique identifier. It is +desirable to be able to uniquely identify each system, and each system should have knowledge of the other systems +participating. + + +These identifiers also make implementing cyclic replication easier, as a cluster can ignore any requests to replicate +some data when that request already contains the current clusterâs identifier. In other words, data we try to replicate +will contain a linked list of identifiers with the provenance of where that data came and each cluster can make the +determination of whether or not it has seen this data already (and thus needs to process and propagate it). This also +lets us treat replication rules as a graph which grants us a common terminology to use when describing replication. + + +This framework provides a general strategy to allow pluggable replication strategies to export data out of an Accumulo +cluster. An AccumuloReplicationStrategy is the only presently targeted replication strategy; however, the implementation +should not prohibit alternative approaches to replication such as other databases or filesystems. + + +### Replication Strategy Implementation + + +Henceforth, both of the RFiles and WAL files that need replication can be treated as a chunk of data. This chunk +references a start offset and length from the source (RFile or WAL) which needs to be replicated. This has the nice +property of being able to use a Combiner to combine multiple, sequential chunks into one larger chunk to amortize RPC +costs. + + +#### Make the master aware of file to replicate + + +Let us define a column family that is used to denote a chunk that needs to be replicated: REPL. We first need to let +master know that it has a new chunk which needs to be replicated. When the file comes from a bulk-import, we need to +create a new entry in the !METADATA table for the given tablet with the REPL column family. If the file is a WAL, we +also want to write an entry for the REPL column[b]. In both cases, the chunkâs URI will be stored in the column +qualifier. The Value can contain some serialized data structure to track cluster replication provenance and offset +values. Each row (tablet) in the !METADATA table will contain zero to many REPL columns. As such, the garbage collector +needs to be modified to not delete these files on the masterâs HDFS instance until these files are replicated (copied to + the peer). + + +#### Choose local TabletServer to perform replication + + +The Accumulo Master can have a thread that scans the replication table to look for chunks to replicate. When it finds +some, choose a TabletServer to perform the replication to all peers. The master should use a FATE operation to manage +the state machine of this replication process. The expected principles, such as exponential backoff on network errors, + should be followed. When all peers have reported successfully receiving the file, the master can remove the REPL + column for the given chunk. + + +On the peer, before beginning transfer, the peer should ascertain a new local, unique filename to use for the remote +file. When the transfer is complete, the file should be treated like log recovery and brought into the appropriate +Tablet. If the peer is also a master (replicating to other nodes), the replicated data should create a new REPL column +in the peerâs table to repeat the replication process, adding in its cluster identifier to the provenance list. +Otherwise, the file can be a candidate for deletion by the garbage collection. + + +The tserver chosen to replicate the data from the master cluster should ideally be the tserver that created that data. +This helps reduce the complexity of dealing with locality later on. If the HDFS blocks written by the tserver are local, + then we gain the same locality perks. + + +#### Recurse + + +In our simple master and peer replication scheme, we are done after the new updates are made available on peer. As +aforementioned, it is relatively easy to âscheduleâ replication of a new file on peer because we just repeat the same +process that master did to replicate to peer in the first place. + + +### Master cluster replication âbookkeepingâ + + +This section outlines the steps on the master cluster to manage the lifecycle of data: when/what data needs to be +replicated and when is a file safe to be removed. + + +Two key structures are used to implement this bookkeeping: + + +1. Tablet-level entry: tabletId repl:fully-qualified-file [] value + + +2. Row-prefix space at end of accumulo.metadata or its own table: *~repl*_fully-qualified-file +clusterName:remoteTableID [] value + + +These two key structures will be outlined below, with â*repl* columnâ and â*~repl* rowâ denoting which is being referred to. + + +#### Data Structure in Value + + +To avoid the necessity of using conditional mutations or other âtransaction-likeâ operations, we can use a combiner to +generate an aggregate view of replication information. Protobuf is decent choice, however, the description isnât tied to +any implementation. I believe a Combiner used in conjunction with the following data structure provides all necessary +functionality: + + + ``// Tracks general lifecycle of the data: file is open and might have new data to replicate, or the file has been`` + ``// closed and will have no new data to replicate`` + + + ``State:Enum { OPEN, CLOSED }`` + + + ``ReplicationStatus { State state, long replication_needed_offset, long replication_finished_offset }`` + + +The offsets refer to the contiguous ranges of records (key-values) written to the WAL. The replication_finished_offset +value tracks what data has been replicated to the given cluster and while the replication_needed_offset value tracks how +much data has been written to the WAL that is ready for replication. replication_finished_offset should always be less +than or equal to replication_needed_offset. For RFiles instead of WALs, state is always CLOSED and +replication_needed_offset is initialized to the length of the RFile. In this context, one can consider the RFile as a +read-only file and the WAL as an append-only file. + + +For *~repl* entries, the target clusterName and remote tableId would be stored in the key to preserve uniqueness. Using +this information, we would be able to implement the following methods: + + + ``bool isFullyReplicated(ReplicationStatus)`` + ``Pair<long,long> rangeNeedingReplication(ReplicationStatus)`` + + +The isFullyReplicated method is straightforward: given the values for start/finish stored for data that needs to be +replicated, and the values for start/finish stored for data that has been replicated and the state is CLOSED, is there +still more data for this ReplicationStatus that needs to be replicated for the given clustername/tableID. + + +rangeNeedingReplication is a bit more complicated. Given the end of a range of data that has already been replicated, +some the end of a range of data that still needs replication, return a range of data that has +not yet been replicated. For example, if keyvalues up to offset 100 in a WAL have already been +replicated and keyvalues up to offset 300 are marked as needing replication, this method should +return [101,300]. Ranges of data replicated, and data needing replication must always be +disjoint and contiguous to ensure that data is replayed in the correct order on the peer. + + +The use of a Combiner is used to create a basic notion of âadditionâ and âsubtractionâ. We cannot use deletes to manage +this without creating a custom iterator, which would not be desirable since it would required to run over the entire +accumulo.metadata table. Avoiding deletions exception on cleanup is also desired to avoid handling âtombstoneâingâ +future version of a Key. The addition operation is when new data is appended to the WAL which signifies new data to be +replicated. This equates to an addition to replication_needed_offset. The subtraction operation is when data from the +WAL has be successfully replicated to the peer for this *~repl* record. This is implemented as an addition to the +replication_finished_offset. + + +When CLOSED is set on a ReplicationStatus, this implies that the WAL has been closed and no new offsets will be added is +would be tracked via the *repl* column. As such, a ReplicationStatus âobjectâ is candidate for deletion when the state is +CLOSED and replication_finished_offset is equal to replication_needed_offset. A value of CLOSED for state is always +propagated over the NEW state. An addition after the state is CLOSED is an invalid operation and would be a logic error. + + +Consider the case of a new data being ingested to the cluster: the following discrete steps should happen. The +assumption that replication is enabled is made to not distract from the actual steps. As previously mentioned, a +combiner must be set on the *repl* column to aggregate the values to properly maintain replication state. The following is +what a tserver will do. + + +1) When a new WAL is created by request of a tserver and the log column is created for a *repl* column within the tabletâs +row to track that this WAL will need to be replicated. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(state=OPEN) + + +2) As the tserver using this WAL finishes commits to the WAL, it should submit a new mutation to track the current +length of data in the WAL that it just wrote that needs to be read for purposes of replication. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/... -> ReplicationStatus(addition + offset) + + +3) Eventually, the tablet server will finish using a WAL, minc contents of memory to disk, and mark the WAL as unused. +This results in updating the state to be CLOSED. + + + INSERT + tablet repl:hdfs://localhost:8020/accumulo/.../wal/⦠-> ReplicationStatus(state=CLOSED) + + +The master also needs a new thread to process the *repl* columns across all tablets in a table and create *~repl* row +entries for the file and where it should be replicated to. The high-level goals for this thread are as follows: + + +1) Create mutations for a WAL that outline where the file must be replicated to (cluster and tabletID) + + + INSERT + *~repl*_hdfs://localhost:8020/accumulo/.../wal/⦠clusterName:tableId -> ReplicationStatus(addition + offset) + + +2) Determine when the *repl* column in a tablet is safe for deletion (all data for it has been replicated). This is the +sign that the GC can then remove this file. + + + DELETE + tablet repl:hdfs://localhost:8020/accumulo/.../wal/⦠+ + +This can be accomplished with a single thread that scans the metadata table: + + +1) Construct âsnapshotâ of tablet *repl* file entries with aggregated offsets, sorted by file, + + + [hdfs://localhost:8020/.../file1 => {[tablet1, RS], [tablet2, RS], ... }, + hdfs://localhost:8020/.../file2 => {[tablet3, RS], [tablet4, RS], ... }, + hdfs://localhost:8020/.../file3 => {[tablet5, [RS:CLOSED]], [tablet6, [RS:CLOSED]], ...] ] + + +2) Begin scanning *~repl* row-prefix with Scanner, perform merged read to join âstateâ from aggregate *repl* column across +tablets, and columns in *~repl* row for the file. + + + for each file in *~repl* rowspace: + if all columns in *~repl*_file row isFullyReplicated: + issue deletes for file in *repl* column for all tablets with references + if delete of *repl* is successful: + delete *~repl* row + else if *~repl* row exists but no *repl* columns: + // Catch failure case from first conditional + delete *~repl* row + else + for each file in âsnapshotâ of *repl* columns: + make mutation for *~repl*_file + for each peer cluster in configuration: + if file should be replicated on peer: + add column for clusterid:remote_tableID -> RS + + +Combiner should be set on all columns in *~repl* prefix rowspace and the *repl* colfam to ensure multiple runs of the +described procedure without actual replication occurring to aggregate data that needs replication. Configuration + + +Replication can be configured on a per-locality-group, replicated that locality group to one or more peers. Given that +we have dynamic column families, trying to track per-column-family replication would be unnecessarily difficult. +Configuration requires new configuration variables that need to be introduced to support the necessary information. Each +peer is defined with a name and the zookeeper quorum of the remote cluster to locate the active Accumulo Master. The +API should ease configuration on replication across all locality groups. Replication cannot be configured on the root or +metadata table. + + +Site-wide: +// The name and location of other clusters +instance.cluster.$name.zookeepers=zk1,zk2,zk3[c] +// The name of this cluster +instance.replication.name=my_cluster_name[d] + +Per-table: +// Declare the locality group(s) that should be replicated and the clusters that they should be replicated to +table.replication.$locality_group_name=cluster1,cluster2,... + + +Shell commands can also be created to make this configuration easier. + + +definecluster cluster_name zookeeper_quorum + + +e.g. definecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 + + + + +deletecluster cluster_name zookeeper_quorum + + +e.g. deletecluster peer peerZK1:2181,peerZK2:2181,peerZK3:2181 + + + + +enablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name + + +e.g. enablereplication -t foo -lg cf1 peer1 enablereplication -t foo -all-loc-groups peer1 + + + + + + +disablereplication -t table (-lg loc_group | --all-loc-groups) cluster_name + + +e.g. disablereplication -t foo -lg cf1 peer1 disablereplication -t foo -all-loc-groups peer1 + + +For peers, we likely do not want to allow users to perform writes against the cluster. Thus, they should be read-only. +This likely requires custom configuration and some ZK state to not accept regular API connections. Should be +exposed/controllable by the shell, too. Common Questions + + +*How do conditional mutations work with this approach?* + + +They do not. They will need to throw an Exception. + + +*How does replication work on a table which already contains data?* + + +When replication is enabled on a table, all new data will be replicated. This implementation does not attempt to support +this as the existing importtable and exporttable already provide support to do this. + + +*When I update a table property on the master, will it propagate to the peer?* + + +There are both arguments for and against this. We likely want to revisit this later as a configuration parameter that +could allow the user to choose if this should happen. We should avoid implementations that would tie us to one or the +other. + + +As an argument against this, consider a production and a backup cluster where the backup cluster is smaller in number of +nodes, but contains more disks. Despite wanting to replicate the data in a table, the configuration of that table may +not be desired (e.g. split threshold, compression codecs, etc). Another argument against could be age-off. If a replica +cluster is not the same size as the production cluster (which is extremely plausible) you would not want the same +age-off rules for both the production and replica. + + +An argument for this feature is that you would want custom compaction iterators (as a combiner, for example) to only be +configured on a table once. You would want these iterators to appear on all replicas. Such an implementation is also +difficult in master-master situations as we donât have a shared ZooKeeper instance that we can use to reliably commit +these changes. + + +*What happens in master-master if two Keys are exactly the same with different values?* + + +Non-deterministic - mostly because we already have this problem: https://issues.apache.org/jira/browse/ACCUMULO-1528 + + +*Did you come up with this all on your own?* + + +Ha, no. Big thanks goes out to HBaseâs documentation, Enis Söztutar (HBase), and other Accumulo devs that Iâve bounced +these ideas off of (too many to enumerate). + + + + +Goals +1. Master-Slave configuration that doesnât exclude future master-master work Per locality-group replication configuration +2. Shell administration of replication Accumulo Monitor integration/insight to replication status State machines for +3. lifecycle of chunks Versionable (read-as protobuf) datastructure to track chunk metadata Thrift for RPC Replication +4. does not require âclosedâ files (can send incremental updates to peers) Ability to replicate âlive insertsâ and âbulk +5. importsâ Provide replication interface with Accumulo->Accumulo implementation Do not rely on active Accumulo Master to +6. perform replication (send or receive) -- delegate to a TabletServer Use FATE where applicable Gracefully handle +7. offline peers Implement read-only variant Master/TabletServer[e] + + +Non-Goals +1. Replicate on smaller granularity than locality group (not individual colfams/colquals or based on visibilities) +2. Wire security between master and peer +3. Support replication of encrypted data[f] +4. Replication of existing data (use importtable & exporttable) +5. Enforce replication of table configuration + + +References + + +* http://www.cs.mcgill.ca/~kemme/papers/vldb00.html +[a] While the WAL is a useful file format for shipping updates (an append-only file), the actual LogFileKey and +LogFileValue pairs may not be sufficient? Might need some extra data internally? Maybe the DFSLogger header could +contain that? +[b] This approach makes the assumption that we only begin the replication process when a WAL is closed. +This is likely too long of a period of time: an offset and length likely needed to be interested to decrease latency. +[c] This needs to be consistent across clusters. Do we need to control access to ensure that it is? Is it excessive to +force users to configure it correctly? +[d] Same as instance.cluster.$name: Do we need to enforce these values? +[e] This isn't an immediate necessity, so I'm tempted to consider punting it as a non-goal for the first implementation +[f] While not in the original scope, it is definitely of great concern.