This is an automated email from the ASF dual-hosted git repository. bbeaudreault pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new ee0c9212588 HBASE-26581 Add metrics for failed replication edits (#4347) ee0c9212588 is described below commit ee0c92125885d92c6538e0eae8018c9d9787dc10 Author: Bri Augenreich <bbcr...@vt.edu> AuthorDate: Tue Apr 26 17:42:54 2022 -0400 HBASE-26581 Add metrics for failed replication edits (#4347) Co-authored-by: Briana Augenreich <baugenre...@hubspot.com> Signed-off-by: Andrew Purtell <apurt...@apache.org> Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org> --- .../MetricsReplicationGlobalSourceSourceImpl.java | 7 +++ .../regionserver/MetricsReplicationSinkSource.java | 3 + .../MetricsReplicationSinkSourceImpl.java | 13 +++++ .../MetricsReplicationSourceSource.java | 2 + .../MetricsReplicationSourceSourceImpl.java | 10 ++++ .../replication/regionserver/MetricsSink.java | 15 +++++ .../replication/regionserver/MetricsSource.java | 9 +++ .../replication/regionserver/ReplicationSink.java | 3 + .../regionserver/ReplicationSourceShipper.java | 1 + .../regionserver/TestReplicationSink.java | 66 +++++++++++++++++++++- 10 files changed, 127 insertions(+), 2 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java index 547617a1669..a838195b445 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java @@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter logReadInBytesCounter; @@ -62,6 +63,8 @@ public class MetricsReplicationGlobalSourceSourceImpl shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L); + failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L); + shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L); shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L); @@ -119,6 +122,10 @@ public class MetricsReplicationGlobalSourceSourceImpl shippedBatchesCounter.incr(batches); } + @Override public void incrFailedBatches() { + failedBatchesCounter.incr(); + } + @Override public void incrOpsShipped(long ops) { shippedOpsCounter.incr(ops); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java index 2498e3426a5..fe11c1049ce 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java @@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience; public interface MetricsReplicationSinkSource { public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp"; public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches"; + public static final String SINK_FAILED_BATCHES = "sink.failedBatches"; public static final String SINK_APPLIED_OPS = "sink.appliedOps"; public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles"; void setLastAppliedOpAge(long age); void incrAppliedBatches(long batches); void incrAppliedOps(long batchsize); + void incrFailedBatches(); long getLastAppliedOpAge(); void incrAppliedHFiles(long hfileSize); long getSinkAppliedOps(); + long getFailedBatches(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java index ce45af5ccec..86bc60577a6 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java @@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS private final MutableHistogram ageHist; private final MutableFastCounter batchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter opsCounter; private final MutableFastCounter hfilesCounter; public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) { ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP); batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L); + failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L); opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L); hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L); } @@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS opsCounter.incr(batchsize); } + @Override + public void incrFailedBatches(){ + failedBatchesCounter.incr(); + } + + @Override + public long getFailedBatches() { + return failedBatchesCounter.value(); + } + @Override public long getLastAppliedOpAge() { return ageHist.getMax(); @@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS @Override public long getSinkAppliedOps() { return opsCounter.value(); } + } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java index 42e28f5d0f3..e9ce8c66247 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java @@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue"; public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp"; public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches"; + public static final String SOURCE_FAILED_BATCHES = "source.failedBatches"; public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes"; public static final String SOURCE_SHIPPED_OPS = "source.shippedOps"; @@ -57,6 +58,7 @@ public interface MetricsReplicationSourceSource extends BaseSource { void decrSizeOfLogQueue(int size); void incrLogEditsFiltered(long size); void incrBatchesShipped(int batches); + void incrFailedBatches(); void incrOpsShipped(long ops); void incrShippedBytes(long size); void incrLogReadInBytes(long size); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java index faf14f79cfb..049e849b52c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java @@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final String logEditsFilteredKey; private final String shippedBatchesKey; private final String shippedOpsKey; + private final String failedBatchesKey; private String keyPrefix; private final String shippedBytesKey; @@ -48,6 +49,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou private final MutableFastCounter logReadInEditsCounter; private final MutableFastCounter walEditsFilteredCounter; private final MutableFastCounter shippedBatchesCounter; + private final MutableFastCounter failedBatchesCounter; private final MutableFastCounter shippedOpsCounter; private final MutableFastCounter shippedBytesCounter; private final MutableFastCounter logReadInBytesCounter; @@ -85,6 +87,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedBatchesKey = this.keyPrefix + "shippedBatches"; shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L); + failedBatchesKey = this.keyPrefix + "failedBatches"; + failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L); + shippedOpsKey = this.keyPrefix + "shippedOps"; shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L); @@ -158,6 +163,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou shippedBatchesCounter.incr(batches); } + @Override public void incrFailedBatches() { + failedBatchesCounter.incr(); + } + @Override public void incrOpsShipped(long ops) { shippedOpsCounter.incr(ops); } @@ -176,6 +185,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou rms.removeMetric(sizeOfLogQueueKey); rms.removeMetric(shippedBatchesKey); + rms.removeMetric(failedBatchesKey); rms.removeMetric(shippedOpsKey); rms.removeMetric(shippedBytesKey); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java index dede79d138c..18cb147af46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java @@ -84,6 +84,21 @@ public class MetricsSink { mss.incrAppliedHFiles(hfileSize); } + /** + * Convenience method to update metrics when batch of operations has failed. + */ + public void incrementFailedBatches(){ + mss.incrFailedBatches(); + } + + /** + * Get the count of the failed bathes + * @return failedBatches + */ + protected long getFailedBatches() { + return mss.getFailedBatches(); + } + /** * Get the Age of Last Applied Op * @return ageOfLastAppliedOp diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java index 3ab08065ca7..18f4354d3c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java @@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource { globalSourceSource.incrShippedBytes(sizeInBytes); } + /** + * Convenience method to update metrics when batch of operations has failed. + */ + public void incrementFailedBatches(){ + singleSourceSource.incrFailedBatches(); + globalSourceSource.incrFailedBatches(); + } + + /** * Gets the number of edits not eligible for replication this source queue logs so far. * @return logEditsFiltered non-replicable edits filtered from this queue logs. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index d1ee0220a9d..ffec5c1eebd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -193,6 +193,7 @@ public class ReplicationSink { for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { + this.metrics.incrementFailedBatches(); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } } @@ -205,6 +206,7 @@ public class ReplicationSink { for (int i = 0; i < count; i++) { // Throw index out of bounds if our cell count is off if (!cells.advance()) { + this.metrics.incrementFailedBatches(); throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); } Cell cell = cells.current(); @@ -281,6 +283,7 @@ public class ReplicationSink { this.totalReplicatedEdits.addAndGet(totalReplicated); } catch (IOException ex) { LOG.error("Unable to accept edit because:", ex); + this.metrics.incrementFailedBatches(); throw ex; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 33869dbf7c7..52032097b67 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -230,6 +230,7 @@ public class ReplicationSourceShipper extends Thread { } break; } catch (Exception ex) { + source.getSourceMetrics().incrementFailedBatches(); LOG.warn("{} threw unknown exception:", source.getReplicationEndpoint().getClass().getName(), ex); if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java index 10a5affcbce..26d33678979 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver; import static org.junit.Assert.assertEquals; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -69,15 +71,14 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey; + @Category({ReplicationTests.class, LargeTests.class}) public class TestReplicationSink { @@ -427,6 +428,67 @@ public class TestReplicationSink { // Clean up the created hfiles or it will mess up subsequent tests } + /** + * Test failure metrics produced for failed replication edits + */ + @Test + public void testFailedReplicationSinkMetrics() throws IOException { + long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches(); + long errorCount = 0L; + List<WALEntry> entries = new ArrayList<>(BATCH_SIZE); + List<Cell> cells = new ArrayList<>(); + for(int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + cells.clear(); // cause IndexOutOfBoundsException + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw ArrayIndexOutOfBoundsException."); + } catch (ArrayIndexOutOfBoundsException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } + + entries.clear(); + cells.clear(); + TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException + for (int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells)); + } + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw TableNotFoundException."); + } catch (TableNotFoundException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } + + entries.clear(); + cells.clear(); + for(int i = 0; i < BATCH_SIZE; i++) { + entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells)); + } + // cause IOException in batch() + try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) { + try (Admin admin = conn.getAdmin()) { + admin.disableTable(TABLE_NAME1); + try { + SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()), + replicationClusterId, baseNamespaceDir, hfileArchiveDir); + Assert.fail("Should re-throw IOException."); + } catch (IOException e) { + errorCount++; + assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches()); + } finally { + admin.enableTable(TABLE_NAME1); + } + } + } + } + + private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) { byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2; byte[] rowBytes = Bytes.toBytes(row);