HBASE-15424 Add bulk load hfile-refs for replication in ZK after the event is appended in the WAL
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/25419d8b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/25419d8b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/25419d8b Branch: refs/heads/HBASE-14850 Commit: 25419d8b18dd8f35a102614cd31b274659f747ef Parents: 5d79790 Author: Ashish Singhi <ashishsin...@apache.org> Authored: Fri Apr 1 15:40:36 2016 +0530 Committer: Ashish Singhi <ashishsin...@apache.org> Committed: Fri Apr 1 15:40:36 2016 +0530 ---------------------------------------------------------------------- .../hbase/regionserver/wal/AbstractFSWAL.java | 4 +- .../hbase/regionserver/wal/MetricsWAL.java | 7 ++- .../regionserver/wal/WALActionsListener.java | 10 +++- .../replication/regionserver/Replication.java | 50 ++++++++++++-------- .../hadoop/hbase/wal/DisabledWALProvider.java | 7 +-- .../hbase/regionserver/wal/TestMetricsWAL.java | 10 ++-- .../hbase/wal/WALPerformanceEvaluation.java | 3 +- 7 files changed, 58 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index f189ff1..b89488a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -840,14 +840,14 @@ public abstract class AbstractFSWAL<W> implements WAL { return true; } - private long postAppend(final Entry e, final long elapsedTime) { + private long postAppend(final Entry e, final long elapsedTime) throws IOException { long len = 0; if (!listeners.isEmpty()) { for (Cell cell : e.getEdit().getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } for (WALActionsListener listener : listeners) { - listener.postAppend(len, elapsedTime); + listener.postAppend(len, elapsedTime, e.getKey(), e.getEdit()); } } return len; http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java index 99792e5..69a31cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/MetricsWAL.java @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.util.StringUtils; @@ -51,7 +55,8 @@ public class MetricsWAL extends WALActionsListener.Base { } @Override - public void postAppend(final long size, final long time) { + public void postAppend(final long size, final long time, final WALKey logkey, + final WALEdit logEdit) throws IOException { source.incrementAppendCount(); source.incrementAppendTime(time); source.incrementAppendSize(size); http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java index a6452e2..adcc6eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java @@ -98,8 +98,12 @@ public interface WALActionsListener { * TODO: Combine this with above. * @param entryLen approx length of cells in this append. * @param elapsedTimeMillis elapsed time in milliseconds. + * @param logKey A WAL key + * @param logEdit A WAL edit containing list of cells. + * @throws IOException if any network or I/O error occurred */ - void postAppend(final long entryLen, final long elapsedTimeMillis); + void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException; /** * For notification post writer sync. Used by metrics system at least. @@ -136,7 +140,9 @@ public interface WALActionsListener { } @Override - public void postAppend(final long entryLen, final long elapsedTimeMillis) {} + public void postAppend(final long entryLen, final long elapsedTimeMillis, final WALKey logKey, + final WALEdit logEdit) throws IOException { + } @Override public void postSync(final long timeInNanos, final int handlerSyncs) {} http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java index bb4a5a3..fa5e222 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.NavigableMap; import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -256,6 +257,34 @@ public class Replication extends WALActionsListener.Base implements scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager()); } + @Override + public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey, + final WALEdit edit) throws IOException { + NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); + if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) { + TableName tableName = logKey.getTablename(); + for (Cell c : edit.getCells()) { + // Only check for bulk load events + if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) { + BulkLoadDescriptor bld = null; + try { + bld = WALEdit.getBulkLoadDescriptor(c); + } catch (IOException e) { + LOG.error("Failed to get bulk load events information from the wal file.", e); + throw e; + } + + for (StoreDescriptor s : bld.getStoresList()) { + byte[] fam = s.getFamilyName().toByteArray(); + if (scopes.containsKey(fam)) { + addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s); + } + } + } + } + } + } + /** * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from * compaction WAL edits and if the scope is local. @@ -268,26 +297,9 @@ public class Replication extends WALActionsListener.Base implements WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager) throws IOException { boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf); - byte[] family; boolean foundOtherEdits = false; for (Cell cell : logEdit.getCells()) { - if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { - try { - BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - for (StoreDescriptor s : bld.getStoresList()) { - family = s.getFamilyName().toByteArray(); - addHFileRefsToQueue(replicationManager, logKey.getTablename(), family, s); - } - } catch (IOException e) { - LOG.error("Failed to get bulk load events information from the wal file.", e); - throw e; - } - } else { - // Skip the flush/compaction/region events - continue; - } - } else { + if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { foundOtherEdits = true; } } @@ -301,7 +313,7 @@ public class Replication extends WALActionsListener.Base implements try { replicationManager.addHFileRefs(tableName, family, s.getStoreFileList()); } catch (ReplicationException e) { - LOG.error("Failed to create hfile references in ZK.", e); + LOG.error("Failed to add hfile references in the replication queue.", e); throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 028c60b..10fe04c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -153,16 +153,17 @@ class DisabledWALProvider implements WALProvider { } @Override - public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) { + public long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) + throws IOException { if (!this.listeners.isEmpty()) { final long start = System.nanoTime(); long len = 0; for (Cell cell : edits.getCells()) { len += CellUtil.estimatedSerializedSizeOf(cell); } - final long elapsed = (System.nanoTime() - start)/1000000l; + final long elapsed = (System.nanoTime() - start) / 1000000L; for (WALActionsListener listener : this.listeners) { - listener.postAppend(len, elapsed); + listener.postAppend(len, elapsed, key, edits); } } return -1; http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java index 2e2aa08..feb6010 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestMetricsWAL.java @@ -60,10 +60,10 @@ public class TestMetricsWAL { MetricsWALSource source = new MetricsWALSourceImpl(); MetricsWAL metricsWAL = new MetricsWAL(source); // One not so slow append (< 1000) - metricsWAL.postAppend(1, 900); + metricsWAL.postAppend(1, 900, null, null); // Two slow appends (> 1000) - metricsWAL.postAppend(1, 1010); - metricsWAL.postAppend(1, 2000); + metricsWAL.postAppend(1, 1010, null, null); + metricsWAL.postAppend(1, 2000, null, null); assertEquals(2, source.getSlowAppendCount()); } @@ -71,8 +71,8 @@ public class TestMetricsWAL { public void testWalWrittenInBytes() throws Exception { MetricsWALSource source = mock(MetricsWALSourceImpl.class); MetricsWAL metricsWAL = new MetricsWAL(source); - metricsWAL.postAppend(100, 900); - metricsWAL.postAppend(200, 2000); + metricsWAL.postAppend(100, 900, null, null); + metricsWAL.postAppend(200, 2000, null, null); verify(source, times(1)).incrementWrittenBytes(100); verify(source, times(1)).incrementWrittenBytes(200); } http://git-wip-us.apache.org/repos/asf/hbase/blob/25419d8b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java index 4a15d3c..7ce03b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/WALPerformanceEvaluation.java @@ -525,7 +525,8 @@ public final class WALPerformanceEvaluation extends Configured implements Tool { } @Override - public void postAppend(final long size, final long elapsedTime) { + public void postAppend(final long size, final long elapsedTime, final WALKey logkey, + final WALEdit logEdit) { appendMeter.mark(size); } });