This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push: new 33d34b7 PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor 33d34b7 is described below commit 33d34b72ecc7bfa55bf2998f7d8d7d90731a2a84 Author: Priyank Porwal <ppor...@salesforce.com> AuthorDate: Fri Jul 12 09:41:33 2019 -0700 PHOENIX-5383: Metrics for the IndexRegionObserver coprocessor Signed-off-by: s.kadam <ska...@apache.org> --- .../phoenix/hbase/index/IndexRegionObserver.java | 110 +++------------------ .../org/apache/phoenix/hbase/index/Indexer.java | 2 +- .../hbase/index/metrics/MetricsIndexerSource.java | 54 ++++++++++ .../index/metrics/MetricsIndexerSourceFactory.java | 10 +- .../index/metrics/MetricsIndexerSourceImpl.java | 50 ++++++++++ 5 files changed, 125 insertions(+), 101 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 516a088..7c37c7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -66,27 +66,20 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder; import org.apache.phoenix.hbase.index.covered.IndexMetaData; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; -import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; -import org.apache.phoenix.hbase.index.write.IndexFailurePolicy; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter; -import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter; -import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; -import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexMetaData; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; -import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; /** * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed @@ -208,20 +201,11 @@ public class IndexRegionObserver extends BaseRegionObserver { // The collection of pending data table rows private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>(); - /** - * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms - * more robust in the face of recoverying index regions that were on the same server as the - * primary table region - */ - private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache(); - private MetricsIndexerSource metricSource; private boolean stopped; private boolean disabled; - private long slowIndexWriteThreshold; private long slowIndexPrepareThreshold; - private long slowPostOpenThreshold; private long slowPreIncrementThreshold; private int rowLockWaitDuration; @@ -259,7 +243,7 @@ public class IndexRegionObserver extends BaseRegionObserver { this.lockManager = new LockManager(); // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat - this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); + this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource(); setSlowThresholds(e.getConfiguration()); } catch (NoSuchMethodError ex) { disabled = true; @@ -274,10 +258,6 @@ public class IndexRegionObserver extends BaseRegionObserver { private void setSlowThresholds(Configuration c) { slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY, INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT); - slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY, - INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT); - slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY, - INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT); slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY, INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT); } @@ -356,21 +336,11 @@ public class IndexRegionObserver extends BaseRegionObserver { if (this.disabled) { return; } - long start = EnvironmentEdgeManager.currentTimeMillis(); try { preBatchMutateWithExceptions(c, miniBatchOp); return; } catch (Throwable t) { rethrowIndexingException(t); - } finally { - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - if (duration >= slowIndexPrepareThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold)); - } - metricSource.incrementNumSlowIndexPrepareCalls(); - } - metricSource.updateIndexPrepareTime(duration); } throw new RuntimeException( "Somehow didn't return an index update but also didn't propagate the failure to the client!"); @@ -535,20 +505,11 @@ public class IndexRegionObserver extends BaseRegionObserver { if (current == null) { current = NullSpan.INSTANCE; } - long start = EnvironmentEdgeManager.currentTimeMillis(); // get the index updates for all elements in this batch Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates = this.builder.getIndexUpdates(miniBatchOp, mutations); - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - if (duration >= slowIndexPrepareThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold)); - } - metricSource.incrementNumSlowIndexPrepareCalls(); - } - metricSource.updateIndexPrepareTime(duration); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); @@ -651,7 +612,11 @@ public class IndexRegionObserver extends BaseRegionObserver { if (mutations == null) { return; } + + long start = EnvironmentEdgeManager.currentTimeMillis(); prepareIndexMutations(c, miniBatchOp, context, mutations); + metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start); + // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates // can be prepared in less than one millisecond @@ -717,7 +682,6 @@ public class IndexRegionObserver extends BaseRegionObserver { if (this.disabled) { return; } - long start = EnvironmentEdgeManager.currentTimeMillis(); BatchMutateContext context = getBatchMutateContext(c); if (context == null) { return; @@ -735,22 +699,19 @@ public class IndexRegionObserver extends BaseRegionObserver { } } finally { removeBatchMutateContext(c); - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - if (duration >= slowIndexWriteThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold)); - } - metricSource.incrementNumSlowIndexWriteCalls(); - } - metricSource.updateIndexWriteTime(duration); } } private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException { + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { doIndexWritesWithExceptions(context, true); + metricSource.updatePostIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start); return; } catch (Throwable e) { + metricSource.updatePostIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start); + metricSource.incrementPostIndexUpdateFailures(); rethrowIndexingException(e); } throw new RuntimeException( @@ -772,21 +733,12 @@ public class IndexRegionObserver extends BaseRegionObserver { if (current == null) { current = NullSpan.INSTANCE; } - long start = EnvironmentEdgeManager.currentTimeMillis(); current.addTimelineAnnotation("Actually doing " + (post ? "post" : "pre") + " index update for first time"); if (post) { postWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion); } else { preWriter.writeAndHandleFailure(indexUpdates, false, context.clientVersion); } - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - if (duration >= slowIndexWriteThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold)); - } - metricSource.incrementNumSlowIndexWriteCalls(); - } - metricSource.updateIndexWriteTime(duration); } } @@ -805,10 +757,15 @@ public class IndexRegionObserver extends BaseRegionObserver { private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { doIndexWritesWithExceptions(context, false); + metricSource.updatePreIndexUpdateTime(EnvironmentEdgeManager.currentTimeMillis() - start); return; } catch (Throwable e) { + metricSource.updatePreIndexUpdateFailureTime(EnvironmentEdgeManager.currentTimeMillis() - start); + metricSource.incrementPreIndexUpdateFailures(); removePendingRows(context); rethrowIndexingException(e); } @@ -816,43 +773,6 @@ public class IndexRegionObserver extends BaseRegionObserver { "Somehow didn't complete the index update, but didn't return succesfully either!"); } - @Override - public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) { - Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion()); - - if (this.disabled) { - return; - } - - long start = EnvironmentEdgeManager.currentTimeMillis(); - try { - //if we have no pending edits to complete, then we are done - if (updates == null || updates.size() == 0) { - return; - } - - LOG.info("Found some outstanding index updates that didn't succeed during" - + " WAL replay - attempting to replay now."); - - // do the usual preWriter stuff - try { - preWriter.writeAndHandleFailure(updates, true, ScanUtil.UNKNOWN_CLIENT_VERSION); - } catch (IOException e) { - LOG.error("During WAL replay of outstanding index updates, " - + "Exception is thrown instead of killing server during index writing", e); - } - } finally { - long duration = EnvironmentEdgeManager.currentTimeMillis() - start; - if (duration >= slowPostOpenThreshold) { - if (LOG.isDebugEnabled()) { - LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold)); - } - metricSource.incrementNumSlowPostOpenCalls(); - } - metricSource.updatePostOpenTime(duration); - } - } - /** * Exposed for testing! * @return the currently instantiated index builder diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index 66c4594..ff8b555 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -222,7 +222,7 @@ public class Indexer extends BaseRegionObserver { this.lockManager = new LockManager(); // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat - this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); + this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource(); setSlowThresholds(e.getConfiguration()); try { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java index e42fccc..19df74c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java @@ -64,6 +64,21 @@ public interface MetricsIndexerSource extends BaseSource { String SLOW_POST_OPEN = "slowPostOpenCalls"; String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold"; + String PRE_INDEX_UPDATE_TIME = "preIndexUpdateTime"; + String PRE_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates pre data updates"; + String POST_INDEX_UPDATE_TIME = "postIndexUpdateTime"; + String POST_INDEX_UPDATE_TIME_DESC = "Histogram for the time in milliseconds for index updates post data updates"; + + String PRE_INDEX_UPDATE_FAILURE_TIME = "preIndexUpdateFailureTime"; + String PRE_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates pre data updates"; + String POST_INDEX_UPDATE_FAILURE_TIME = "postIndexUpdateFailureTime"; + String POST_INDEX_UPDATE_FAILURE_TIME_DESC = "Histogram for the time in milliseconds on failures of index updates post data updates"; + + String PRE_INDEX_UPDATE_FAILURE = "preIndexUpdateFailure"; + String PRE_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates pre data updates"; + String POST_INDEX_UPDATE_FAILURE = "postIndexUpdateFailure"; + String POST_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates post data updates"; + /** * Updates the index preparation time histogram (preBatchMutate). * @@ -147,4 +162,43 @@ public interface MetricsIndexerSource extends BaseSource { * Increments the number of slow preIncrementAfteRowLock calls. */ void incrementSlowDuplicateKeyCheckCalls(); + + // Below metrics are introduced by IndexRegionObserver coprocessor + /** + * Updates the pre index update time histogram. + * + * @param t time taken in milliseconds + */ + void updatePreIndexUpdateTime(long t); + + /** + * Updates the post index update time histogram. + * + * @param t time taken in milliseconds + */ + void updatePostIndexUpdateTime(long t); + + /** + * Updates the pre index update failure time histogram. + * + * @param t time taken in milliseconds + */ + void updatePreIndexUpdateFailureTime(long t); + + /** + * Updates the post index update failure time histogram. + * + * @param t time taken in milliseconds + */ + void updatePostIndexUpdateFailureTime(long t); + + /** + * Increments the number of pre index update failures. + */ + void incrementPreIndexUpdateFailures(); + + /** + * Increments the number of post index update failures. + */ + void incrementPostIndexUpdateFailures(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java index 8d97f7b..e373e2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java @@ -21,7 +21,7 @@ package org.apache.phoenix.hbase.index.metrics; */ public class MetricsIndexerSourceFactory { private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory(); - private MetricsIndexerSource source; + private MetricsIndexerSource indexerSource; private MetricsIndexerSourceFactory() {} @@ -29,10 +29,10 @@ public class MetricsIndexerSourceFactory { return INSTANCE; } - public synchronized MetricsIndexerSource create() { - if (INSTANCE.source == null) { - INSTANCE.source = new MetricsIndexerSourceImpl(); + public synchronized MetricsIndexerSource getIndexerSource() { + if (INSTANCE.indexerSource == null) { + INSTANCE.indexerSource = new MetricsIndexerSourceImpl(); } - return INSTANCE.source; + return INSTANCE.indexerSource; } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java index cc82bb2..9bcf9fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java @@ -40,6 +40,13 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI private final MetricHistogram duplicateKeyTimeHisto; private final MutableFastCounter slowDuplicateKeyCalls; + private final MetricHistogram preIndexUpdateTimeHisto; + private final MetricHistogram postIndexUpdateTimeHisto; + private final MetricHistogram preIndexUpdateFailureTimeHisto; + private final MetricHistogram postIndexUpdateFailureTimeHisto; + private final MutableFastCounter preIndexUpdateFailures; + private final MutableFastCounter postIndexUpdateFailures; + public MetricsIndexerSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); } @@ -62,6 +69,19 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L); duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC); slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L); + + postIndexUpdateTimeHisto = getMetricsRegistry().newHistogram( + POST_INDEX_UPDATE_TIME, POST_INDEX_UPDATE_TIME_DESC); + preIndexUpdateTimeHisto = getMetricsRegistry().newHistogram( + PRE_INDEX_UPDATE_TIME, PRE_INDEX_UPDATE_TIME_DESC); + postIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram( + POST_INDEX_UPDATE_FAILURE_TIME, POST_INDEX_UPDATE_FAILURE_TIME_DESC); + preIndexUpdateFailureTimeHisto = getMetricsRegistry().newHistogram( + PRE_INDEX_UPDATE_FAILURE_TIME, PRE_INDEX_UPDATE_FAILURE_TIME_DESC); + postIndexUpdateFailures = getMetricsRegistry().newCounter( + POST_INDEX_UPDATE_FAILURE, POST_INDEX_UPDATE_FAILURE_DESC, 0L); + preIndexUpdateFailures = getMetricsRegistry().newCounter( + PRE_INDEX_UPDATE_FAILURE, PRE_INDEX_UPDATE_FAILURE_DESC, 0L); } @Override @@ -133,4 +153,34 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI public void incrementSlowDuplicateKeyCheckCalls() { slowDuplicateKeyCalls.incr(); } + + @Override + public void updatePreIndexUpdateTime(long t) { + preIndexUpdateTimeHisto.add(t); + } + + @Override + public void updatePostIndexUpdateTime(long t) { + postIndexUpdateTimeHisto.add(t); + } + + @Override + public void updatePreIndexUpdateFailureTime(long t) { + preIndexUpdateFailureTimeHisto.add(t); + } + + @Override + public void updatePostIndexUpdateFailureTime(long t) { + postIndexUpdateFailureTimeHisto.add(t); + } + + @Override + public void incrementPreIndexUpdateFailures() { + preIndexUpdateFailures.incr(); + } + + @Override + public void incrementPostIndexUpdateFailures() { + postIndexUpdateFailures.incr(); + } } \ No newline at end of file