[ 
https://issues.apache.org/jira/browse/HBASE-19494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292858#comment-16292858
 ] 

wolfgang hoschek edited comment on HBASE-19494 at 12/15/17 5:28 PM:
--------------------------------------------------------------------

The configurable write time threshold for the filtering is per peer 
subscriptionId aka per table. 

Here is a proposal: Add a small interface (say "WALEntryFilter") that takes the 
entry's timestamp and table and returns whether the entry should be ignored, 
along these lines:

{code}
public interface WALEntryFilter { // this is a new Interface
   public boolean filter(TableName table, long writeTime);
}
{code}

Now modify ReplicationSink.replicateEntries() slightly to call that filter if 
the connection implements it:

{code}
ReplicationSink.replicateEntries(...) {
  ...
  for (WALEntry entry : entries) {
     TableName table = 
TableName.valueOf(entry.getKey().getTableName().toByteArray());
     Connection conn = getConnection();
     if (conn instanceof WALEntryFilter) { // this is a new Interface
       if (((WALEntryFilter)conn).filter(table, entry.getKey().getWriteTime())) 
{
          continue; // ignore unwanted entry
       }
     }
     ... same as current code
   }
   ... same as current code   
}
{code}

Then the IndexerConnection implements not just Connection, but also implements 
the WALEntryFilter interface, along these lines:

{code}
public class IndexerConnection implements Connection, WALEntryFilter { 
   private Map<String, Long> timestampThresholds = ...
   public boolean filter(TableName table, long writeTime) {
       long timestampThreshold = 
timestampThresholds.get(table.getNameAsString());
       return writeTime < timestampThreshold;
   }
   ... and so on
}
{code}




was (Author: whoschek):
The configurable write time threshold for the filtering is per peer 
subscriptionId aka per table. 

Here is a proposal: Add a small interface (say "WALEntryFilter") that takes the 
entry's timestamp and table and returns whether the entry should be ignored, 
along these lines:

{code}
public interface WALEntryFilter { // this is a new Interface
   public boolean filter(TableName table, long writeTime);
}
{code}

Now modify ReplicationSink.replicateEntries() slightly to call that filter if 
the connection implements it:

{code}
ReplicationSink.replicateEntries(...) {
  ...
  for (WALEntry entry : entries) {
     TableName table = 
TableName.valueOf(entry.getKey().getTableName().toByteArray());
     Connection conn = getConnection();
     if (conn instanceof WALEntryFilter) { // this is a new Interface
       if (((WALEntryFilter)conn).filter(table, entry.getKey().getWriteTime())) 
{
          continue; // ignore unwanted entry
       }
     }
     ... same as current code
   }
   ... same as current code   
}
{code}

Then the IndexerConnection implements not just Connection, but also implements 
the WALEntryFilter interface, along these lines:

{code}
public class IndexerConnection implements Connection, WALEntryFilter { 
   private Map<String, Long> timestampThresholds = ...
   public boolean filter(TableName table, long writeTime) {
       long timestampThreshold = timestampThresholds(table.getNameAsString());
       return writeTime < timestampThreshold;
   }
   ... and so on
}
{code}



> Create simple WALKey filter that can be plugged in on the Replication Sink
> --------------------------------------------------------------------------
>
>                 Key: HBASE-19494
>                 URL: https://issues.apache.org/jira/browse/HBASE-19494
>             Project: HBase
>          Issue Type: Sub-task
>          Components: Replication
>            Reporter: stack
>            Assignee: stack
>             Fix For: 2.0.0-beta-1
>
>
> hbase-indexer used to look at WALKeys on the sink to see if their time of 
> creation was before the time at which the replication stream was enabled.
> In the parent redo, there is no means for doing this anymore (because WALKey 
> used to be Private and because to get at the WALKey in the Sink, you had to 
> override all of the Replication which meant importing a million Private 
> objects...).
> This issue is about adding a simple filter to Replication on the sink-side 
> that just takes a WALKey (now InterfaceAudience LimitedPrivate and recently 
> made read-only).
> Assigned myself. Need to do this so hbase-indexer can move to hbase2.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to