[ https://issues.apache.org/jira/browse/HBASE-19494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292858#comment-16292858 ]
wolfgang hoschek edited comment on HBASE-19494 at 12/15/17 5:27 PM: -------------------------------------------------------------------- The configurable write time threshold for the filtering is per peer subscriptionId aka per table. Here is a proposal: Add a small interface (say "WALEntryFilter") that takes the entry's timestamp and table and returns whether the entry should be ignored, along these lines: {code} public interface WALEntryFilter { // this is a new Interface public boolean filter(TableName table, long writeTime); } {code} Now modify ReplicationSink.replicateEntries() slightly to call that filter if the connection implements it: {code} ReplicationSink.replicateEntries(...) { ... for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); Connection conn = getConnection(); if (conn instanceof WALEntryFilter) { // this is a new Interface if (((WALEntryFilter)conn).filter(table, entry.getKey().getWriteTime())) { continue; // ignore unwanted entry } } ... same as current code } ... same as current code } {code} Then the IndexerConnection implements not just Connection, but also implements the WALEntryFilter interface, along these lines: {code} public class IndexerConnection implements Connection, WALEntryFilter { private Map<String, Long> timestampThresholds = ... public boolean filter(TableName table, long writeTime) { long timestampThreshold = timestampThresholds(table.getNameAsString()); return writeTime < timestampThreshold; } ... and so on } {code} was (Author: whoschek): The configurable write time threshold for the filtering is per peer subscriptionId aka per table. Here is a proposal: Add a small interface (say "WALEntryFilter") that takes the entry's timestamp and table and returns whether the entry should be ignored, along these lines: {code} public interface WALEntryFilter { // this is a new Interface public boolean filter(TableName table, long writeTime); } {code} Now modify ReplicationSink.replicateEntries slightly to call that filter if the connection implements it: {code} ReplicationSink.replicateEntries(...) { ... for (WALEntry entry : entries) { TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray()); Connection conn = getConnection(); if (conn instanceof WALEntryFilter) { // this is a new Interface if (((WALEntryFilter)conn).filter(table, entry.getKey().getWriteTime())) { continue; // ignore unwanted entry } } ... same as current code } ... same as current code } {code} Then the IndexerConnection implements not just Connection, but also implements the WALEntryFilter interface, along these lines: {code} public class IndexerConnection implements Connection, WALEntryFilter { private Map<String, Long> timestampThresholds = ... public boolean filter(TableName table, long writeTime) { long timestampThreshold = timestampThresholds(table.getNameAsString()); return writeTime < timestampThreshold; } ... and so on } {code} > Create simple WALKey filter that can be plugged in on the Replication Sink > -------------------------------------------------------------------------- > > Key: HBASE-19494 > URL: https://issues.apache.org/jira/browse/HBASE-19494 > Project: HBase > Issue Type: Sub-task > Components: Replication > Reporter: stack > Assignee: stack > Fix For: 2.0.0-beta-1 > > > hbase-indexer used to look at WALKeys on the sink to see if their time of > creation was before the time at which the replication stream was enabled. > In the parent redo, there is no means for doing this anymore (because WALKey > used to be Private and because to get at the WALKey in the Sink, you had to > override all of the Replication which meant importing a million Private > objects...). > This issue is about adding a simple filter to Replication on the sink-side > that just takes a WALKey (now InterfaceAudience LimitedPrivate and recently > made read-only). > Assigned myself. Need to do this so hbase-indexer can move to hbase2. -- This message was sent by Atlassian JIRA (v6.4.14#64029)