Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.1 b01312bbb -> c2a738961 refs/heads/4.x-HBase-1.2 f3a524258 -> 6dea01173 refs/heads/master bff368d49 -> ca1105630
PHOENIX-4042 Add hadoop metrics2-based Indexer coproc metrics Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ca110563 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ca110563 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ca110563 Branch: refs/heads/master Commit: ca1105630dab43a8629e2efa0171d914e0140b3e Parents: bff368d Author: Josh Elser <[email protected]> Authored: Wed Jul 19 17:02:09 2017 -0400 Committer: Josh Elser <[email protected]> Committed: Wed Jul 19 17:36:02 2017 -0400 ---------------------------------------------------------------------- phoenix-core/pom.xml | 1 - .../org/apache/phoenix/hbase/index/Indexer.java | 189 ++++++++++++++++--- .../index/metrics/MetricsIndexerSource.java | 150 +++++++++++++++ .../metrics/MetricsIndexerSourceFactory.java | 38 ++++ .../index/metrics/MetricsIndexerSourceImpl.java | 136 +++++++++++++ pom.xml | 1 - 6 files changed, 483 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index db03acf..86c92fa 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -437,7 +437,6 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index d65ad05..2d923e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -70,6 +70,8 @@ import org.apache.htrace.TraceScope; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; import org.apache.phoenix.hbase.index.builder.IndexBuildManager; import org.apache.phoenix.hbase.index.builder.IndexBuilder; +import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource; +import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.IndexManagementUtil; @@ -82,6 +84,7 @@ import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache; import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.ServerUtil; @@ -126,6 +129,17 @@ public class Indexer extends BaseRegionObserver { private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy"; + private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold"; + private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000; + private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold"; + private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000; + private static final String INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.wal.restore.threshold"; + private static final long INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT = 3_000; + private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold"; + private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000; + private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment"; + private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000; + /** * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms * more robust in the face of recoverying index regions that were on the same server as the @@ -139,8 +153,15 @@ public class Indexer extends BaseRegionObserver { */ private IndexWriter recoveryWriter; + private MetricsIndexerSource metricSource; + private boolean stopped; private boolean disabled; + private long slowIndexWriteThreshold; + private long slowIndexPrepareThreshold; + private long slowPreWALRestoreThreshold; + private long slowPostOpenThreshold; + private long slowPreIncrementThreshold; public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY; @@ -185,6 +206,11 @@ public class Indexer extends BaseRegionObserver { DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(clonedConfig, env); // setup the actual index writer this.writer = new IndexWriter(indexWriterEnv, serverName + "-index-writer"); + + // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat + this.metricSource = MetricsIndexerSourceFactory.getInstance().create(); + setSlowThresholds(e.getConfiguration()); + try { // get the specified failure policy. We only ever override it in tests, but we need to do it // here @@ -206,6 +232,30 @@ public class Indexer extends BaseRegionObserver { } } + /** + * Extracts the slow call threshold values from the configuration. + */ + private void setSlowThresholds(Configuration c) { + slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY, + INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT); + slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY, + INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT); + slowPreWALRestoreThreshold = c.getLong(INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY, + INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT); + slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY, + INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT); + slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY, + INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT); + } + + private String getCallTooSlowMessage(String callName, long duration, long threshold) { + StringBuilder sb = new StringBuilder(64); + sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration); + sb.append("ms, threshold=").append(threshold).append("ms"); + return sb.toString(); + } + + @Override public void stop(CoprocessorEnvironment e) throws IOException { if (this.stopped) { @@ -232,6 +282,7 @@ public class Indexer extends BaseRegionObserver { @Override public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e, final Increment inc) throws IOException { + long start = EnvironmentEdgeManager.currentTimeMillis(); try { List<Mutation> mutations = this.builder.executeAtomicOp(inc); if (mutations == null) { @@ -256,6 +307,15 @@ public class Indexer extends BaseRegionObserver { "Unable to process ON DUPLICATE IGNORE for " + e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + "(" + Bytes.toStringBinary(inc.getRow()) + ")", t); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowIndexPrepareThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold)); + } + metricSource.incrementSlowDuplicateKeyCheckCalls(); + } + metricSource.updateDuplicateKeyCheckTime(duration); } } @@ -266,11 +326,21 @@ public class Indexer extends BaseRegionObserver { super.preBatchMutate(c, miniBatchOp); return; } + long start = EnvironmentEdgeManager.currentTimeMillis(); try { preBatchMutateWithExceptions(c, miniBatchOp); return; } catch (Throwable t) { rethrowIndexingException(t); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowIndexPrepareThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold)); + } + metricSource.incrementNumSlowIndexPrepareCalls(); + } + metricSource.updateIndexPrepareTime(duration); } throw new RuntimeException( "Somehow didn't return an index update but also didn't propagate the failure to the client!"); @@ -340,9 +410,21 @@ public class Indexer extends BaseRegionObserver { if (current == null) { current = NullSpan.INSTANCE; } + long start = EnvironmentEdgeManager.currentTimeMillis(); + // get the index updates for all elements in this batch Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations.values()); + + + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowIndexPrepareThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold)); + } + metricSource.incrementNumSlowIndexPrepareCalls(); + } + metricSource.updateIndexPrepareTime(duration); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); @@ -403,15 +485,27 @@ public class Indexer extends BaseRegionObserver { if (this.disabled) { super.postBatchMutateIndispensably(c, miniBatchOp, success); return; - } - this.builder.batchCompleted(miniBatchOp); - - if (success) { // if miniBatchOp was successfully written, write index updates - //each batch operation, only the first one will have anything useful, so we can just grab that - Mutation mutation = miniBatchOp.getOperation(0); - WALEdit edit = miniBatchOp.getWalEdit(0); - doPost(edit, mutation, mutation.getDurability()); - } + } + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { + this.builder.batchCompleted(miniBatchOp); + + if (success) { // if miniBatchOp was successfully written, write index updates + //each batch operation, only the first one will have anything useful, so we can just grab that + Mutation mutation = miniBatchOp.getOperation(0); + WALEdit edit = miniBatchOp.getWalEdit(0); + doPost(edit, mutation, mutation.getDurability()); + } + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowIndexWriteThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("postBatchMutateIndispensably", duration, slowIndexWriteThreshold)); + } + metricSource.incrementNumSlowIndexWriteCalls(); + } + metricSource.updateIndexWriteTime(duration); + } } private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException { @@ -439,6 +533,7 @@ public class Indexer extends BaseRegionObserver { if (current == null) { current = NullSpan.INSTANCE; } + long start = EnvironmentEdgeManager.currentTimeMillis(); // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first // and then do it again later when getting out the index updates. This should be pretty minor @@ -477,6 +572,15 @@ public class Indexer extends BaseRegionObserver { ikv.markBatchFinished(); } } + + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowIndexWriteThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("indexWrite", duration, slowIndexWriteThreshold)); + } + metricSource.incrementNumSlowIndexWriteCalls(); + } + metricSource.updateIndexWriteTime(duration); } } @@ -519,23 +623,35 @@ public class Indexer extends BaseRegionObserver { if (this.disabled) { super.postOpen(c); return; - } - - //if we have no pending edits to complete, then we are done - if (updates == null || updates.size() == 0) { - return; } - LOG.info("Found some outstanding index updates that didn't succeed during" - + " WAL replay - attempting to replay now."); - - // do the usual writer stuff, killing the server again, if we can't manage to make the index - // writes succeed again + long start = EnvironmentEdgeManager.currentTimeMillis(); try { - writer.writeAndKillYourselfOnFailure(updates, true); - } catch (IOException e) { - LOG.error("During WAL replay of outstanding index updates, " - + "Exception is thrown instead of killing server during index writing", e); + //if we have no pending edits to complete, then we are done + if (updates == null || updates.size() == 0) { + return; + } + + LOG.info("Found some outstanding index updates that didn't succeed during" + + " WAL replay - attempting to replay now."); + + // do the usual writer stuff, killing the server again, if we can't manage to make the index + // writes succeed again + try { + writer.writeAndKillYourselfOnFailure(updates, true); + } catch (IOException e) { + LOG.error("During WAL replay of outstanding index updates, " + + "Exception is thrown instead of killing server during index writing", e); + } + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowPostOpenThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("postOpen", duration, slowPostOpenThreshold)); + } + metricSource.incrementNumSlowPostOpenCalls(); + } + metricSource.updatePostOpenTime(duration); } } @@ -545,19 +661,32 @@ public class Indexer extends BaseRegionObserver { if (this.disabled) { super.preWALRestore(env, info, logKey, logEdit); return; - } + } + // TODO check the regions in transition. If the server on which the region lives is this one, // then we should rety that write later in postOpen. // we might be able to get even smarter here and pre-split the edits that are server-local // into their own recovered.edits file. This then lets us do a straightforward recovery of each // region (and more efficiently as we aren't writing quite as hectically from this one place). - /* - * Basically, we let the index regions recover for a little while long before retrying in the - * hopes they come up before the primary table finishes. - */ - Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit); - recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true); + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { + /* + * Basically, we let the index regions recover for a little while long before retrying in the + * hopes they come up before the primary table finishes. + */ + Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit); + recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates, true); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + if (duration >= slowPreWALRestoreThreshold) { + if (LOG.isDebugEnabled()) { + LOG.debug(getCallTooSlowMessage("preWALRestore", duration, slowPreWALRestoreThreshold)); + } + metricSource.incrementNumSlowPreWALRestoreCalls(); + } + metricSource.updatePreWALRestoreTime(duration); + } } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java new file mode 100644 index 0000000..e42fccc --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSource; +import org.apache.phoenix.hbase.index.Indexer; + +/** + * Interface for metrics about {@link Indexer}. + */ +public interface MetricsIndexerSource extends BaseSource { + // Metrics2 and JMX constants + String METRICS_NAME = "PhoenixIndexer"; + String METRICS_CONTEXT = "phoenix"; + String METRICS_DESCRIPTION = "Metrics about the Phoenix Indexer"; + String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME; + + String INDEX_PREPARE_TIME = "indexPrepareTime"; + String INDEX_PREPARE_TIME_DESC = "Histogram for the time in milliseconds for preparing an index write"; + String SLOW_INDEX_PREPARE = "slowIndexPrepareCalls"; + String SLOW_INDEX_PREPARE_DESC = "The number of index preparations slower than the configured threshold"; + + String INDEX_WRITE_TIME = "indexWriteTime"; + String INDEX_WRITE_TIME_DESC = "Histogram for the time in milliseconds for writing an index update"; + String SLOW_INDEX_WRITE = "slowIndexWriteCalls"; + String SLOW_INDEX_WRITE_DESC = "The number of index writes slower than the configured threshold"; + + String DUPLICATE_KEY_TIME = "duplicateKeyCheckTime"; + String DUPLICATE_KEY_TIME_DESC = "Histogram for the time in milliseconds to handle ON DUPLICATE keywords"; + String SLOW_DUPLICATE_KEY = "slowDuplicateKeyCheckCalls"; + String SLOW_DUPLICATE_KEY_DESC = "The number of on duplicate key checks slower than the configured threshold"; + + String PRE_WAL_RESTORE_TIME = "preWALRestoreTime"; + String PRE_WAL_RESTORE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's preWALRestore"; + String SLOW_PRE_WAL_RESTORE = "slowPreWALRestoreCalls"; + String SLOW_PRE_WAL_RESTORE_DESC = "The number of preWALRestore calls slower than the configured threshold"; + + String POST_PUT_TIME = "postPutTime"; + String POST_PUT_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postPut"; + String SLOW_POST_PUT = "slowPostPutCalls"; + String SLOW_POST_PUT_DESC = "The number of postPut calls slower than the configured threshold"; + + String POST_DELETE_TIME = "postDeleteTime"; + String POST_DELETE_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postDelete"; + String SLOW_POST_DELETE = "slowPostDeleteCalls"; + String SLOW_POST_DELETE_DESC = "The number of postDelete calls slower than the configured threshold"; + + String POST_OPEN_TIME = "postOpenTime"; + String POST_OPEN_TIME_DESC = "Histogram for the time in milliseconds for Indexer's postOpen"; + String SLOW_POST_OPEN = "slowPostOpenCalls"; + String SLOW_POST_OPEN_DESC = "The number of postOpen calls slower than the configured threshold"; + + /** + * Updates the index preparation time histogram (preBatchMutate). + * + * @param t time taken in milliseconds + */ + void updateIndexPrepareTime(long t); + + /** + * Increments the number of slow calls prepare an index write. + */ + void incrementNumSlowIndexPrepareCalls(); + + /** + * Updates the index write time histogram (postBatchMutate). + * + * @param t time taken in milliseconds + */ + void updateIndexWriteTime(long t); + + /** + * Increments the number of slow calls to write to the index. + */ + void incrementNumSlowIndexWriteCalls(); + + /** + * Updates the preWALRestore time histogram. + * + * @param t time taken in milliseconds + */ + void updatePreWALRestoreTime(long t); + + /** + * Increments the number of slow preWALRestore calls. + */ + void incrementNumSlowPreWALRestoreCalls(); + + /** + * Updates the postPut time histogram. + * + * @param t time taken in milliseconds + */ + void updatePostPutTime(long t); + + /** + * Increments the number of slow postPut calls. + */ + void incrementNumSlowPostPutCalls(); + + /** + * Updates the postDelete time histogram. + * + * @param t time taken in milliseconds + */ + void updatePostDeleteTime(long t); + + /** + * Increments the number of slow postDelete calls. + */ + void incrementNumSlowPostDeleteCalls(); + + /** + * Updates the postOpen time histogram. + * + * @param t time taken in milliseconds + */ + void updatePostOpenTime(long t); + + /** + * Increments the number of slow postOpen calls. + */ + void incrementNumSlowPostOpenCalls(); + + /** + * Updates the preIncrementAfterRowLock time histogram. + * + * @param t time taken in milliseconds + */ + void updateDuplicateKeyCheckTime(long t); + + /** + * Increments the number of slow preIncrementAfteRowLock calls. + */ + void incrementSlowDuplicateKeyCheckCalls(); +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java new file mode 100644 index 0000000..8d97f7b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +/** + * Factory class for creating {@link MetricsIndexerSource} instances. + */ +public class MetricsIndexerSourceFactory { + private static final MetricsIndexerSourceFactory INSTANCE = new MetricsIndexerSourceFactory(); + private MetricsIndexerSource source; + + private MetricsIndexerSourceFactory() {} + + public static MetricsIndexerSourceFactory getInstance() { + return INSTANCE; + } + + public synchronized MetricsIndexerSource create() { + if (INSTANCE.source == null) { + INSTANCE.source = new MetricsIndexerSourceImpl(); + } + return INSTANCE.source; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java new file mode 100644 index 0000000..cc82bb2 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.hbase.index.metrics; + +import org.apache.hadoop.hbase.metrics.BaseSourceImpl; +import org.apache.hadoop.metrics2.MetricHistogram; +import org.apache.hadoop.metrics2.lib.MutableFastCounter; + +/** + * Implementation for tracking Phoenix Indexer metrics. + */ +public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsIndexerSource { + + private final MetricHistogram indexPrepareTimeHisto; + private final MutableFastCounter slowIndexPrepareCalls; + private final MetricHistogram indexWriteTimeHisto; + private final MutableFastCounter slowIndexWriteCalls; + private final MetricHistogram preWALRestoreTimeHisto; + private final MutableFastCounter slowPreWALRestoreCalls; + private final MetricHistogram postPutTimeHisto; + private final MutableFastCounter slowPostPutCalls; + private final MetricHistogram postDeleteTimeHisto; + private final MutableFastCounter slowPostDeleteCalls; + private final MetricHistogram postOpenTimeHisto; + private final MutableFastCounter slowPostOpenCalls; + private final MetricHistogram duplicateKeyTimeHisto; + private final MutableFastCounter slowDuplicateKeyCalls; + + public MetricsIndexerSourceImpl() { + this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT); + } + + public MetricsIndexerSourceImpl(String metricsName, String metricsDescription, + String metricsContext, String metricsJmxContext) { + super(metricsName, metricsDescription, metricsContext, metricsJmxContext); + + indexPrepareTimeHisto = getMetricsRegistry().newHistogram(INDEX_PREPARE_TIME, INDEX_PREPARE_TIME_DESC); + slowIndexPrepareCalls = getMetricsRegistry().newCounter(SLOW_INDEX_PREPARE, SLOW_INDEX_PREPARE_DESC, 0L); + indexWriteTimeHisto = getMetricsRegistry().newHistogram(INDEX_WRITE_TIME, INDEX_WRITE_TIME_DESC); + slowIndexWriteCalls = getMetricsRegistry().newCounter(SLOW_INDEX_WRITE, SLOW_INDEX_WRITE_DESC, 0L); + preWALRestoreTimeHisto = getMetricsRegistry().newHistogram(PRE_WAL_RESTORE_TIME, PRE_WAL_RESTORE_TIME_DESC); + slowPreWALRestoreCalls = getMetricsRegistry().newCounter(SLOW_PRE_WAL_RESTORE, SLOW_PRE_WAL_RESTORE_DESC, 0L); + postPutTimeHisto = getMetricsRegistry().newHistogram(POST_PUT_TIME, POST_PUT_TIME_DESC); + slowPostPutCalls = getMetricsRegistry().newCounter(SLOW_POST_PUT, SLOW_POST_PUT_DESC, 0L); + postDeleteTimeHisto = getMetricsRegistry().newHistogram(POST_DELETE_TIME, POST_DELETE_TIME_DESC); + slowPostDeleteCalls = getMetricsRegistry().newCounter(SLOW_POST_DELETE, SLOW_POST_DELETE_DESC, 0L); + postOpenTimeHisto = getMetricsRegistry().newHistogram(POST_OPEN_TIME, POST_OPEN_TIME_DESC); + slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L); + duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC); + slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L); + } + + @Override + public void updateIndexPrepareTime(long t) { + indexPrepareTimeHisto.add(t); + } + + @Override + public void updateIndexWriteTime(long t) { + indexWriteTimeHisto.add(t); + } + + @Override + public void updatePreWALRestoreTime(long t) { + preWALRestoreTimeHisto.add(t); + } + + @Override + public void updatePostPutTime(long t) { + postPutTimeHisto.add(t); + } + + @Override + public void updatePostDeleteTime(long t) { + postDeleteTimeHisto.add(t); + } + + @Override + public void updatePostOpenTime(long t) { + postOpenTimeHisto.add(t); + } + + @Override + public void incrementNumSlowIndexPrepareCalls() { + slowIndexPrepareCalls.incr(); + } + + @Override + public void incrementNumSlowIndexWriteCalls() { + slowIndexWriteCalls.incr(); + } + + @Override + public void incrementNumSlowPreWALRestoreCalls() { + slowPreWALRestoreCalls.incr(); + } + + @Override + public void incrementNumSlowPostPutCalls() { + slowPostPutCalls.incr(); + } + + @Override + public void incrementNumSlowPostDeleteCalls() { + slowPostDeleteCalls.incr(); + } + + @Override + public void incrementNumSlowPostOpenCalls() { + slowPostOpenCalls.incr(); + } + + @Override + public void updateDuplicateKeyCheckTime(long t) { + duplicateKeyTimeHisto.add(t); + } + + @Override + public void incrementSlowDuplicateKeyCheckCalls() { + slowDuplicateKeyCalls.incr(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/ca110563/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a46c50c..b2009a4 100644 --- a/pom.xml +++ b/pom.xml @@ -664,7 +664,6 @@ <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> <version>${hbase.version}</version> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId>
