[ https://issues.apache.org/jira/browse/HBASE-16415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050466#comment-16050466 ]
Jan Kunigk edited comment on HBASE-16415 at 6/15/17 1:23 PM: ------------------------------------------------------------- Hi Guanghao, thanks for your feedback. Yes, I agree it would be better to use a new ReplicationEndpoint. The ReplicationEndpoint is passed into the run() method of ReplicaitonSource: {code} // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.newArrayList( (WALEntryFilter)new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); {code} then at the end of this method a new attempt for starting the shipper thread (i.e. ReplicationSourceWALReaderThread) is launched: {code} tryStartNewShipperThread(walGroupId, queue); {code} tryStartNewShipperThread() is invoking startNewWALReaderThread, which returns a ReplicationSourceWALReaderThread and also applies all filters to it: {code} ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics); {code} I agree, that embedding the redirections information into a new ReplicationEndpoint just like we do with the filters makes sense. But, the inspection of the individual WAL entries would still have to occur in the Shipper Threads itself (like {code} entry = filterEntry(entry); entry = redirectEntry(entry); {code} ) Do you agree? Or am I missing something? was (Author: jan.kun...@gmail.com): Hi Guanghao, thanks for your feedback. Yes, I agree it would be better to use a new ReplicationEndpoint. The ReplicationEndpoint is passed into the run() method of ReplicaitonSource: {code} // get the WALEntryFilter from ReplicationEndpoint and add it to default filters ArrayList<WALEntryFilter> filters = Lists.newArrayList( (WALEntryFilter)new SystemTableWALEntryFilter()); WALEntryFilter filterFromEndpoint = this.replicationEndpoint.getWALEntryfilter(); {code} then at the end of this method a new attempt for starting the shipper thread (i.e. ReplicationSourceWALReaderThread) is launched: ``` tryStartNewShipperThread(walGroupId, queue); ``` tryStartNewShipperThread() is invoking startNewWALReaderThread, which returns a ReplicationSourceWALReaderThread and also applies all filters to it: ``` ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters); ReplicationSourceWALReaderThread walReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, metrics); ``` I agree, that embedding the redirections information into a new ReplicationEndpoint just like we do with the filters makes sense. But, the inspection of the individual WAL entries would still have to occur in the Shipper Threads itself (like ``` entry = filterEntry(entry); entry = redirectEntry(entry); ``` ) Do you agree? Or am I missing something? > Replication in different namespace > ---------------------------------- > > Key: HBASE-16415 > URL: https://issues.apache.org/jira/browse/HBASE-16415 > Project: HBase > Issue Type: New Feature > Components: Replication > Reporter: Christian Guegi > Assignee: Jan Kunigk > > It would be nice to replicate tables from one namespace to another namespace. > Example: > Master cluster, namespace=default, table=bar > Slave cluster, namespace=dr, table=bar > Replication happens in class ReplicationSink: > public void replicateEntries(List<WALEntry> entries, final CellScanner > cells, ...){ > ... > TableName table = > TableName.valueOf(entry.getKey().getTableName().toByteArray()); > ... > addToHashMultiMap(rowMap, table, clusterIds, m); > ... > for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : > rowMap.entrySet()) { > batch(entry.getKey(), entry.getValue().values()); > } > } -- This message was sent by Atlassian JIRA (v6.4.14#64029)