[ 
https://issues.apache.org/jira/browse/HBASE-16415?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050466#comment-16050466
 ] 

Jan Kunigk edited comment on HBASE-16415 at 6/15/17 1:23 PM:
-------------------------------------------------------------

Hi Guanghao, thanks for your feedback.
Yes, I agree it would be better to use a new ReplicationEndpoint.

The ReplicationEndpoint is passed into the run() method of ReplicaitonSource:
{code}
    // get the WALEntryFilter from ReplicationEndpoint and add it to default 
filters
    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
      (WALEntryFilter)new SystemTableWALEntryFilter());
    WALEntryFilter filterFromEndpoint = 
this.replicationEndpoint.getWALEntryfilter();
{code}

then at the end of this method a new attempt for starting the shipper thread 
(i.e. ReplicationSourceWALReaderThread) is launched:
{code}
      tryStartNewShipperThread(walGroupId, queue);
{code}
tryStartNewShipperThread() is invoking startNewWALReaderThread, which returns a 
ReplicationSourceWALReaderThread and also applies all filters to it:
{code}
   ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
    
    ReplicationSourceWALReaderThread walReader = new 
ReplicationSourceWALReaderThread(manager,
        replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, 
metrics);
{code}
I agree, that embedding the redirections information into a new 
ReplicationEndpoint just like we do with the filters makes sense.
But, the inspection of the individual WAL entries would still have to occur in 
the Shipper Threads itself
(like
{code}
entry = filterEntry(entry);
entry = redirectEntry(entry);
{code}
)

Do you agree? Or am I missing something? 


was (Author: jan.kun...@gmail.com):
Hi Guanghao, thanks for your feedback.
Yes, I agree it would be better to use a new ReplicationEndpoint.

The ReplicationEndpoint is passed into the run() method of ReplicaitonSource:
{code}
    // get the WALEntryFilter from ReplicationEndpoint and add it to default 
filters
    ArrayList<WALEntryFilter> filters = Lists.newArrayList(
      (WALEntryFilter)new SystemTableWALEntryFilter());
    WALEntryFilter filterFromEndpoint = 
this.replicationEndpoint.getWALEntryfilter();
{code}

then at the end of this method a new attempt for starting the shipper thread 
(i.e. ReplicationSourceWALReaderThread) is launched:
```
      tryStartNewShipperThread(walGroupId, queue);
```
tryStartNewShipperThread() is invoking startNewWALReaderThread, which returns a 
ReplicationSourceWALReaderThread and also applies all filters to it:
```
   ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
    
    ReplicationSourceWALReaderThread walReader = new 
ReplicationSourceWALReaderThread(manager,
        replicationQueueInfo, queue, startPosition, fs, conf, readerFilter, 
metrics);
```
I agree, that embedding the redirections information into a new 
ReplicationEndpoint just like we do with the filters makes sense.
But, the inspection of the individual WAL entries would still have to occur in 
the Shipper Threads itself
(like
```
entry = filterEntry(entry);
entry = redirectEntry(entry);
```
)

Do you agree? Or am I missing something? 

> Replication in different namespace
> ----------------------------------
>
>                 Key: HBASE-16415
>                 URL: https://issues.apache.org/jira/browse/HBASE-16415
>             Project: HBase
>          Issue Type: New Feature
>          Components: Replication
>            Reporter: Christian Guegi
>            Assignee: Jan Kunigk
>
> It would be nice to replicate tables from one namespace to another namespace.
> Example:
> Master cluster, namespace=default, table=bar
> Slave cluster, namespace=dr, table=bar
> Replication happens in class ReplicationSink:
>   public void replicateEntries(List<WALEntry> entries, final CellScanner 
> cells, ...){
>     ...
>     TableName table = 
> TableName.valueOf(entry.getKey().getTableName().toByteArray());
>     ...
>     addToHashMultiMap(rowMap, table, clusterIds, m);
>     ...
>     for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : 
> rowMap.entrySet()) {
>       batch(entry.getKey(), entry.getValue().values());
>     }
>    }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to