http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 857a952..57fa25a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.execute; +import static org.apache.phoenix.monitoring.TaskExecutionMetricsHolder.NO_OP_INSTANCE; import static org.apache.phoenix.util.LogUtil.addCustomAnnotations; import java.sql.SQLException; @@ -54,6 +55,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.job.JobManager.JobCallable; import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.join.HashJoinInfo; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.parse.FilterableStatement; import org.apache.phoenix.parse.ParseNode; import org.apache.phoenix.parse.SQLParser; @@ -140,6 +142,11 @@ public class HashJoinPlan extends DelegateQueryPlan { public Object getJobId() { return HashJoinPlan.this; } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return NO_OP_INSTANCE; + } })); }
http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 99f41b2..af3bcf3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -17,6 +17,10 @@ */ package org.apache.phoenix.execute; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BATCH_SIZE; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME; + import java.io.IOException; import java.sql.SQLException; import java.util.Arrays; @@ -39,7 +43,11 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.monitoring.PhoenixMetrics; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.monitoring.MutationMetricQueue; +import org.apache.phoenix.monitoring.MutationMetricQueue.MutationMetric; +import org.apache.phoenix.monitoring.MutationMetricQueue.NoOpMutationMetricsQueue; +import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.schema.MetaDataClient; @@ -65,9 +73,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sun.istack.NotNull; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BYTES; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_BATCH_SIZE; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.MUTATION_COMMIT_TIME; /** * @@ -85,11 +90,17 @@ public class MutationState implements SQLCloseable { private final Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations; private long sizeOffset; private int numRows = 0; + private final MutationMetricQueue mutationMetricQueue; + private ReadMetricQueue readMetricQueue; - MutationState(long maxSize, PhoenixConnection connection, Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations) { + MutationState(long maxSize, PhoenixConnection connection, + Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { this.maxSize = maxSize; this.connection = connection; this.mutations = mutations; + boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); + this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() + : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; } public MutationState(long maxSize, PhoenixConnection connection) { @@ -108,6 +119,12 @@ public class MutationState implements SQLCloseable { throwIfTooBig(); } + public static MutationState emptyMutationState(long maxSize, PhoenixConnection connection) { + MutationState state = new MutationState(maxSize, connection, Collections.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>emptyMap()); + state.sizeOffset = 0; + return state; + } + private void throwIfTooBig() { if (numRows > maxSize) { // TODO: throw SQLException ? @@ -120,17 +137,18 @@ public class MutationState implements SQLCloseable { } /** - * Combine a newer mutation with this one, where in the event of overlaps, - * the newer one will take precedence. - * @param newMutation the newer mutation + * Combine a newer mutation with this one, where in the event of overlaps, the newer one will take precedence. + * Combine any metrics collected for the newer mutation. + * + * @param newMutationState the newer mutation state */ - public void join(MutationState newMutation) { - if (this == newMutation) { // Doesn't make sense + public void join(MutationState newMutationState) { + if (this == newMutationState) { // Doesn't make sense return; } - this.sizeOffset += newMutation.sizeOffset; + this.sizeOffset += newMutationState.sizeOffset; // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutation.mutations.entrySet()) { + for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : newMutationState.mutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); PTable table = tableRef.getTable(); @@ -168,6 +186,12 @@ public class MutationState implements SQLCloseable { } } } + mutationMetricQueue.combineMetricQueues(newMutationState.mutationMetricQueue); + if (readMetricQueue == null) { + readMetricQueue = newMutationState.readMetricQueue; + } else if (readMetricQueue != null && newMutationState.readMetricQueue != null) { + readMetricQueue.combineReadMetrics(newMutationState.readMetricQueue); + } throwIfTooBig(); } @@ -332,18 +356,15 @@ public class MutationState implements SQLCloseable { return timeStamps; } - private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) { + private static long calculateMutationSize(List<Mutation> mutations) { long byteSize = 0; - int keyValueCount = 0; - if (PhoenixMetrics.isMetricsEnabled() || logger.isDebugEnabled()) { + if (GlobalClientMetrics.isMetricsEnabled()) { for (Mutation mutation : mutations) { byteSize += mutation.heapSize(); } - MUTATION_BYTES.update(byteSize); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection)); - } } + GLOBAL_MUTATION_BYTES.update(byteSize); + return byteSize; } @SuppressWarnings("deprecation") @@ -352,126 +373,134 @@ public class MutationState implements SQLCloseable { byte[] tenantId = connection.getTenantId() == null ? null : connection.getTenantId().getBytes(); long[] serverTimeStamps = validate(); Iterator<Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>>> iterator = this.mutations.entrySet().iterator(); - // add tracing for this operation - TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables"); - Span span = trace.getSpan(); - while (iterator.hasNext()) { - Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); - Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); - TableRef tableRef = entry.getKey(); - PTable table = tableRef.getTable(); - table.getIndexMaintainers(tempPtr, connection); - boolean hasIndexMaintainers = tempPtr.getLength() > 0; - boolean isDataTable = true; - long serverTimestamp = serverTimeStamps[i++]; - Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); - while (mutationsIterator.hasNext()) { - Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); - byte[] htableName = pair.getFirst(); - List<Mutation> mutations = pair.getSecond(); - - //create a span per target table - //TODO maybe we can be smarter about the table name to string here? - Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { + Span span = trace.getSpan(); + while (iterator.hasNext()) { + Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry = iterator.next(); + // at this point we are going through mutations for each table - int retryCount = 0; - boolean shouldRetry = false; - do { - ServerCache cache = null; - if (hasIndexMaintainers && isDataTable) { - byte[] attribValue = null; - byte[] uuidValue; - if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { - IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); - cache = client.addIndexMetadataCache(mutations, tempPtr); - child.addTimelineAnnotation("Updated index metadata cache"); - uuidValue = cache.getId(); - // If we haven't retried yet, retry for this case only, as it's possible that - // a split will occur after we send the index metadata cache to all known - // region servers. - shouldRetry = true; - } else { - attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); - uuidValue = ServerCacheClient.generateId(); - } - // Either set the UUID to be able to access the index metadata from the cache - // or set the index metadata directly on the Mutation - for (Mutation mutation : mutations) { - if (tenantId != null) { - mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); - } - mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); - if (attribValue != null) { - mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); - } - } - } - - SQLException sqlE = null; - HTableInterface hTable = connection.getQueryServices().getTable(htableName); - try { - logMutationSize(hTable, mutations, connection); - MUTATION_BATCH_SIZE.update(mutations.size()); - long startTime = System.currentTimeMillis(); - child.addTimelineAnnotation("Attempt " + retryCount); - hTable.batch(mutations); - child.stop(); - long duration = System.currentTimeMillis() - startTime; - MUTATION_COMMIT_TIME.update(duration); - shouldRetry = false; - if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + duration + " ms", connection)); - } catch (Exception e) { - SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); - if (inferredE != null) { - if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { - // Swallow this exception once, as it's possible that we split after sending the index metadata - // and one of the region servers doesn't have it. This will cause it to have it the next go around. - // If it fails again, we don't retry. - String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(LogUtil.addCustomAnnotations(msg, connection)); - connection.getQueryServices().clearTableRegionCache(htableName); + Map<ImmutableBytesPtr,RowMutationState> valuesMap = entry.getValue(); + // above is mutations for a table where the first part is the row key and the second part is column values. - // add a new child span as this one failed - child.addTimelineAnnotation(msg); - child.stop(); - child = Tracing.child(span,"Failed batch, attempting retry"); + TableRef tableRef = entry.getKey(); + PTable table = tableRef.getTable(); + table.getIndexMaintainers(tempPtr, connection); + boolean hasIndexMaintainers = tempPtr.getLength() > 0; + boolean isDataTable = true; + long serverTimestamp = serverTimeStamps[i++]; + Iterator<Pair<byte[],List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false); + // above returns an iterator of pair where the first + while (mutationsIterator.hasNext()) { + Pair<byte[],List<Mutation>> pair = mutationsIterator.next(); + byte[] htableName = pair.getFirst(); + List<Mutation> mutations = pair.getSecond(); - continue; + //create a span per target table + //TODO maybe we can be smarter about the table name to string here? + Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName)); + + int retryCount = 0; + boolean shouldRetry = false; + do { + ServerCache cache = null; + if (hasIndexMaintainers && isDataTable) { + byte[] attribValue = null; + byte[] uuidValue; + if (IndexMetaDataCacheClient.useIndexMetadataCache(connection, mutations, tempPtr.getLength())) { + IndexMetaDataCacheClient client = new IndexMetaDataCacheClient(connection, tableRef); + cache = client.addIndexMetadataCache(mutations, tempPtr); + child.addTimelineAnnotation("Updated index metadata cache"); + uuidValue = cache.getId(); + // If we haven't retried yet, retry for this case only, as it's possible that + // a split will occur after we send the index metadata cache to all known + // region servers. + shouldRetry = true; + } else { + attribValue = ByteUtil.copyKeyBytesIfNecessary(tempPtr); + uuidValue = ServerCacheClient.generateId(); + } + // Either set the UUID to be able to access the index metadata from the cache + // or set the index metadata directly on the Mutation + for (Mutation mutation : mutations) { + if (tenantId != null) { + mutation.setAttribute(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + } + mutation.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue); + if (attribValue != null) { + mutation.setAttribute(PhoenixIndexCodec.INDEX_MD, attribValue); + } } - e = inferredE; } - sqlE = new CommitException(e, getUncommittedSattementIndexes()); - } finally { + + SQLException sqlE = null; + HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { - hTable.close(); - } catch (IOException e) { - if (sqlE != null) { - sqlE.setNextException(ServerUtil.parseServerException(e)); - } else { - sqlE = ServerUtil.parseServerException(e); + long numMutations = mutations.size(); + GLOBAL_MUTATION_BATCH_SIZE.update(numMutations); + + long startTime = System.currentTimeMillis(); + child.addTimelineAnnotation("Attempt " + retryCount); + hTable.batch(mutations); + child.stop(); + shouldRetry = false; + long mutationCommitTime = System.currentTimeMillis() - startTime; + GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); + + long mutationSizeBytes = calculateMutationSize(mutations); + MutationMetric mutationsMetric = new MutationMetric(numMutations, mutationSizeBytes, mutationCommitTime); + mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName), mutationsMetric); + } catch (Exception e) { + SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); + if (inferredE != null) { + if (shouldRetry && retryCount == 0 && inferredE.getErrorCode() == SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) { + // Swallow this exception once, as it's possible that we split after sending the index metadata + // and one of the region servers doesn't have it. This will cause it to have it the next go around. + // If it fails again, we don't retry. + String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); + connection.getQueryServices().clearTableRegionCache(htableName); + + // add a new child span as this one failed + child.addTimelineAnnotation(msg); + child.stop(); + child = Tracing.child(span,"Failed batch, attempting retry"); + + continue; + } + e = inferredE; } + sqlE = new CommitException(e, getUncommittedStatementIndexes()); } finally { try { - if (cache != null) { - cache.close(); + hTable.close(); + } catch (IOException e) { + if (sqlE != null) { + sqlE.setNextException(ServerUtil.parseServerException(e)); + } else { + sqlE = ServerUtil.parseServerException(e); } } finally { - if (sqlE != null) { - throw sqlE; + try { + if (cache != null) { + cache.close(); + } + } finally { + if (sqlE != null) { + throw sqlE; + } } } } - } - } while (shouldRetry && retryCount++ < 1); - isDataTable = false; - } - if (tableRef.getTable().getType() != PTableType.INDEX) { - numRows -= entry.getValue().size(); + } while (shouldRetry && retryCount++ < 1); + isDataTable = false; + } + if (tableRef.getTable().getType() != PTableType.INDEX) { + numRows -= entry.getValue().size(); + } + iterator.remove(); // Remove batches as we process them } - iterator.remove(); // Remove batches as we process them } - trace.close(); assert(numRows==0); assert(this.mutations.isEmpty()); } @@ -481,7 +510,7 @@ public class MutationState implements SQLCloseable { numRows = 0; } - private int[] getUncommittedSattementIndexes() { + private int[] getUncommittedStatementIndexes() { int[] result = new int[0]; for (Map<ImmutableBytesPtr, RowMutationState> rowMutations : mutations.values()) { for (RowMutationState rowMutationState : rowMutations.values()) { @@ -533,12 +562,23 @@ public class MutationState implements SQLCloseable { int[] getStatementIndexes() { return statementIndexes; } - + void join(RowMutationState newRow) { getColumnValues().putAll(newRow.getColumnValues()); statementIndexes = joinSortedIntArrays(statementIndexes, newRow.getStatementIndexes()); } - + } + + public ReadMetricQueue getReadMetricQueue() { + return readMetricQueue; + } + public void setReadMetricQueue(ReadMetricQueue readMetricQueue) { + this.readMetricQueue = readMetricQueue; } + + public MutationMetricQueue getMutationMetricQueue() { + return mutationMetricQueue; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java index 031b58b..2bed3a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java @@ -49,7 +49,7 @@ public class UnionPlan implements QueryPlan { private final FilterableStatement statement; private final ParameterMetaData paramMetaData; private final OrderBy orderBy; - private final StatementContext context; + private final StatementContext parentContext; private final Integer limit; private final GroupBy groupBy; private final RowProjector projector; @@ -59,7 +59,7 @@ public class UnionPlan implements QueryPlan { public UnionPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, OrderBy orderBy, GroupBy groupBy, List<QueryPlan> plans, ParameterMetaData paramMetaData) throws SQLException { - this.context = context; + this.parentContext = context; this.statement = statement; this.tableRef = table; this.projector = projector; @@ -128,7 +128,7 @@ public class UnionPlan implements QueryPlan { } public final ResultIterator iterator(final List<? extends SQLCloseable> dependencies) throws SQLException { - this.iterators = new UnionResultIterators(plans); + this.iterators = new UnionResultIterators(plans, parentContext); ResultIterator scanner; boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); @@ -175,7 +175,7 @@ public class UnionPlan implements QueryPlan { @Override public StatementContext getContext() { - return context; + return parentContext; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index 6a3847b..43731cb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -18,8 +18,8 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_TIMEOUT; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; import java.sql.SQLException; @@ -540,12 +540,13 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } catch (ExecutionException e) { try { // Rethrow as SQLException throw ServerUtil.parseServerException(e); - } catch (StaleRegionBoundaryCacheException e2) { + } catch (StaleRegionBoundaryCacheException e2) { // Catch only to try to recover from region boundary cache being out of date List<List<Pair<Scan,Future<PeekingResultIterator>>>> newFutures = Lists.newArrayListWithExpectedSize(2); if (!clearedCache) { // Clear cache once so that we rejigger job based on new boundaries services.clearTableRegionCache(physicalTableName); clearedCache = true; + context.getOverallQueryMetrics().cacheRefreshedDueToSplits(); } // Resubmit just this portion of work again Scan oldScan = scanPair.getFirst(); @@ -582,7 +583,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result success = true; return iterators; } catch (TimeoutException e) { - QUERY_TIMEOUT.increment(); + context.getOverallQueryMetrics().queryTimedOut(); + GLOBAL_QUERY_TIMEOUT_COUNTER.increment(); // thrown when a thread times out waiting for the future.get() call to return toThrow = new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT) .setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms") @@ -616,7 +618,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } finally { if (toThrow != null) { - FAILED_QUERY.increment(); + GLOBAL_FAILED_QUERY_COUNTER.increment(); + context.getOverallQueryMetrics().queryFailed(); throw toThrow; } } @@ -639,7 +642,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (futurePair != null) { Future<PeekingResultIterator> future = futurePair.getSecond(); if (future != null) { - cancelledWork |= future.cancel(false); + future.cancel(false); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index e1ee8db..f272e55 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -19,6 +19,7 @@ package org.apache.phoenix.iterate; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import java.sql.SQLException; import java.util.List; @@ -66,18 +67,17 @@ public class ChunkedResultIterator implements PeekingResultIterator { } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { - scanner.close(); //close the iterator since we don't need it anymore. + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String tableName) throws SQLException { if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, context.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, - QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE)); + QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner); } } - public ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, - StatementContext context, TableRef tableRef, Scan scan, long chunkSize) throws SQLException { + private ChunkedResultIterator(ParallelIteratorFactory delegateIteratorFactory, + StatementContext context, TableRef tableRef, Scan scan, long chunkSize, ResultIterator scanner) throws SQLException { this.delegateIteratorFactory = delegateIteratorFactory; this.context = context; this.tableRef = tableRef; @@ -87,9 +87,9 @@ public class ChunkedResultIterator implements PeekingResultIterator { // to get parallel scans kicked off in separate threads. If we delay this, // we'll get serialized behavior (see PHOENIX- if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); - ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( - new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); + ResultIterator singleChunkResultIterator = new SingleChunkResultIterator(scanner, chunkSize); + String tableName = tableRef.getTable().getPhysicalName().getString(); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); } @Override @@ -118,9 +118,10 @@ public class ChunkedResultIterator implements PeekingResultIterator { scan = ScanUtil.newScan(scan); scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); + String tableName = tableRef.getTable().getPhysicalName().getString(); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( - new TableResultIterator(context, tableRef, scan), chunkSize); - resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); + new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName)), chunkSize); + resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan, tableName); } return resultIterator; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java index df8f658..f25e373 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java @@ -25,10 +25,10 @@ import org.apache.phoenix.compile.StatementContext; public interface ParallelIteratorFactory { public static ParallelIteratorFactory NOOP_FACTORY = new ParallelIteratorFactory() { @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException { return LookAheadResultIterator.wrap(scanner); } }; - PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException; + PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index be10c20..2dfbfe3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -17,7 +17,7 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.PARALLEL_SCANS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_NUM_PARALLEL_SCANS; import java.sql.SQLException; import java.util.Collections; @@ -30,6 +30,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.job.JobManager.JobCallable; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.monitoring.CombinableMetric; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ScanUtil; @@ -79,19 +83,25 @@ public class ParallelIterators extends BaseResultIterators { // Shuffle so that we start execution across many machines // before we fill up the thread pool Collections.shuffle(scanLocations); - PARALLEL_SCANS.update(scanLocations.size()); + ReadMetricQueue readMetrics = context.getReadMetricsQueue(); + final String physicalTableName = tableRef.getTable().getPhysicalName().getString(); + int numScans = scanLocations.size(); + context.getOverallQueryMetrics().updateNumParallelScans(numScans); + GLOBAL_NUM_PARALLEL_SCANS.update(numScans); for (ScanLocator scanLocation : scanLocations) { final Scan scan = scanLocation.getScan(); + final CombinableMetric scanMetrics = readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName); + final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(readMetrics, physicalTableName); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { - + @Override public PeekingResultIterator call() throws Exception { long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(context, tableRef, scan); + ResultIterator scanner = new TableResultIterator(context, tableRef, scan, scanMetrics); if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); } - PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan); + PeekingResultIterator iterator = iteratorFactory.newIterator(context, scanner, scan, physicalTableName); // Fill the scanner's cache. This helps reduce latency since we are parallelizing the I/O needed. iterator.peek(); @@ -109,6 +119,11 @@ public class ParallelIterators extends BaseResultIterators { public Object getJobId() { return ParallelIterators.this; } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return taskMetrics; + } }, "Parallel scanner for table: " + tableRef.getTable().getName().getString())); // Add our future in the right place so that we can concatenate the // results of the inner futures versus merge sorting across all of them. http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java index 4a9ad3e..92ac570 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RoundRobinResultIterator.java @@ -18,7 +18,7 @@ package org.apache.phoenix.iterate; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.FAILED_QUERY; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import java.sql.SQLException; import java.util.ArrayList; @@ -268,7 +268,7 @@ public class RoundRobinResultIterator implements ResultIterator { } } finally { if (toThrow != null) { - FAILED_QUERY.increment(); + GLOBAL_FAILED_QUERY_COUNTER.increment(); throw toThrow; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index fd65d0c..b722794 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -17,7 +17,7 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SCAN_BYTES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES; import java.io.IOException; import java.sql.SQLException; @@ -28,15 +28,20 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.phoenix.monitoring.PhoenixMetrics; +import org.apache.phoenix.monitoring.CombinableMetric.NoOpRequestMetric; +import org.apache.phoenix.monitoring.GlobalClientMetrics; +import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ServerUtil; public class ScanningResultIterator implements ResultIterator { private final ResultScanner scanner; - public ScanningResultIterator(ResultScanner scanner) { + private final CombinableMetric scanMetrics; + + public ScanningResultIterator(ResultScanner scanner, CombinableMetric scanMetrics) { this.scanner = scanner; + this.scanMetrics = scanMetrics; } @Override @@ -66,17 +71,18 @@ public class ScanningResultIterator implements ResultIterator { return "ScanningResultIterator [scanner=" + scanner + "]"; } - private static void calculateScanSize(Result result) { - if (PhoenixMetrics.isMetricsEnabled()) { - if (result != null) { - Cell[] cells = result.rawCells(); - long scanResultSize = 0; - for (Cell cell : cells) { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - scanResultSize += kv.heapSize(); - } - SCAN_BYTES.update(scanResultSize); - } - } - } + private void calculateScanSize(Result result) { + if (GlobalClientMetrics.isMetricsEnabled() || scanMetrics != NoOpRequestMetric.INSTANCE) { + if (result != null) { + Cell[] cells = result.rawCells(); + long scanResultSize = 0; + for (Cell cell : cells) { + KeyValue kv = KeyValueUtil.ensureKeyValue(cell); + scanResultSize += kv.heapSize(); + } + scanMetrics.change(scanResultSize); + GLOBAL_SCAN_BYTES.update(scanResultSize); + } + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index 6b3b5e3..516d73e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.iterate; +import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; + import java.sql.SQLException; import java.util.Collections; import java.util.List; @@ -29,11 +31,9 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.iterate.TableResultIterator.ScannerCreation; import org.apache.phoenix.job.JobManager.JobCallable; +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; import org.apache.phoenix.trace.util.Tracing; -import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ScanUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -48,7 +48,6 @@ import com.google.common.collect.Lists; * @since 0.1 */ public class SerialIterators extends BaseResultIterators { - private static final Logger logger = LoggerFactory.getLogger(SerialIterators.class); private static final String NAME = "SERIAL"; private final ParallelIteratorFactory iteratorFactory; @@ -74,18 +73,15 @@ public class SerialIterators extends BaseResultIterators { Scan lastScan = scans.get(scans.size()-1); final Scan overallScan = ScanUtil.newScan(firstScan); overallScan.setStopRow(lastScan.getStopRow()); + final String tableName = tableRef.getTable().getPhysicalName().getString(); + final TaskExecutionMetricsHolder taskMetrics = new TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName); Future<PeekingResultIterator> future = executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() { - @Override public PeekingResultIterator call() throws Exception { List<PeekingResultIterator> concatIterators = Lists.newArrayListWithExpectedSize(scans.size()); for (final Scan scan : scans) { - long startTime = System.currentTimeMillis(); - ResultIterator scanner = new TableResultIterator(context, tableRef, scan, ScannerCreation.DELAYED); - if (logger.isDebugEnabled()) { - logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + scan, ScanUtil.getCustomAnnotations(scan))); - } - concatIterators.add(iteratorFactory.newIterator(context, scanner, scan)); + ResultIterator scanner = new TableResultIterator(context, tableRef, scan, context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), ScannerCreation.DELAYED); + concatIterators.add(iteratorFactory.newIterator(context, scanner, scan, tableName)); } PeekingResultIterator concatIterator = ConcatResultIterator.newIterator(concatIterators); allIterators.add(concatIterator); @@ -101,6 +97,11 @@ public class SerialIterators extends BaseResultIterators { public Object getJobId() { return SerialIterators.this; } + + @Override + public TaskExecutionMetricsHolder getTaskExecutionMetric() { + return taskMetrics; + } }, "Serial scanner for table: " + tableRef.getTable().getName().getString())); // Add our singleton Future which will execute serially nestedFutures.add(Collections.singletonList(new Pair<Scan,Future<PeekingResultIterator>>(overallScan,future))); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 63d3761..0a3c32b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -17,8 +17,10 @@ */ package org.apache.phoenix.iterate; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.NUM_SPOOL_FILE; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.SPOOL_FILE_SIZE; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_CHUNK_BYTES; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MEMORY_WAIT_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_SIZE; import java.io.BufferedInputStream; import java.io.DataInputStream; @@ -37,6 +39,9 @@ import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.memory.MemoryManager; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.monitoring.MemoryMetricsHolder; +import org.apache.phoenix.monitoring.ReadMetricQueue; +import org.apache.phoenix.monitoring.SpoolingMetricsHolder; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.tuple.ResultTuple; @@ -55,8 +60,10 @@ import org.apache.phoenix.util.TupleUtil; * @since 0.1 */ public class SpoolingResultIterator implements PeekingResultIterator { - private final PeekingResultIterator spoolFrom; + private final PeekingResultIterator spoolFrom; + private final SpoolingMetricsHolder spoolMetrics; + private final MemoryMetricsHolder memoryMetrics; public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory { private final QueryServices services; @@ -64,14 +71,16 @@ public class SpoolingResultIterator implements PeekingResultIterator { this.services = services; } @Override - public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { - return new SpoolingResultIterator(scanner, services); + public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan, String physicalTableName) throws SQLException { + ReadMetricQueue readRequestMetric = context.getReadMetricsQueue(); + SpoolingMetricsHolder spoolMetrics = new SpoolingMetricsHolder(readRequestMetric, physicalTableName); + MemoryMetricsHolder memoryMetrics = new MemoryMetricsHolder(readRequestMetric, physicalTableName); + return new SpoolingResultIterator(spoolMetrics, memoryMetrics, scanner, services); } - } - public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws SQLException { - this (scanner, services.getMemoryManager(), + private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices services) throws SQLException { + this (spoolMetrics, memoryMetrics, scanner, services.getMemoryManager(), services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES), services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES), services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)); @@ -86,9 +95,15 @@ public class SpoolingResultIterator implements PeekingResultIterator { * the memory manager) is exceeded. * @throws SQLException */ - SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException { + SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException { + this.spoolMetrics = sMetrics; + this.memoryMetrics = mMetrics; boolean success = false; + long startTime = System.currentTimeMillis(); final MemoryChunk chunk = mm.allocate(0, thresholdBytes); + long waitTime = System.currentTimeMillis() - startTime; + GLOBAL_MEMORY_WAIT_TIME.update(waitTime); + memoryMetrics.getMemoryWaitTimeMetric().change(waitTime); DeferredFileOutputStream spoolTo = null; try { // Can't be bigger than int, since it's the max of the above allocation @@ -96,8 +111,11 @@ public class SpoolingResultIterator implements PeekingResultIterator { spoolTo = new DeferredFileOutputStream(size, "ResultSpooler",".bin", new File(spoolDirectory)) { @Override protected void thresholdReached() throws IOException { - super.thresholdReached(); - chunk.close(); + try { + super.thresholdReached(); + } finally { + chunk.close(); + } } }; DataOutputStream out = new DataOutputStream(spoolTo); @@ -115,9 +133,14 @@ public class SpoolingResultIterator implements PeekingResultIterator { byte[] data = spoolTo.getData(); chunk.resize(data.length); spoolFrom = new InMemoryResultIterator(data, chunk); + GLOBAL_MEMORY_CHUNK_BYTES.update(data.length); + memoryMetrics.getMemoryChunkSizeMetric().change(data.length); } else { - NUM_SPOOL_FILE.increment(); - SPOOL_FILE_SIZE.update(spoolTo.getFile().length()); + long sizeOfSpoolFile = spoolTo.getFile().length(); + GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile); + GLOBAL_SPOOL_FILE_COUNTER.increment(); + spoolMetrics.getNumSpoolFileMetric().increment(); + spoolMetrics.getSpoolFileSizeMetric().change(sizeOfSpoolFile); spoolFrom = new OnDiskResultIterator(spoolTo.getFile()); if (spoolTo.getFile() != null) { spoolTo.getFile().deleteOnExit(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index ea13dfd..6f040d1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.monitoring.CombinableMetric; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.Closeables; @@ -44,9 +45,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator private final Scan scan; private final HTableInterface htable; private volatile ResultIterator delegate; - - public TableResultIterator(StatementContext context, TableRef tableRef) throws SQLException { - this(context, tableRef, context.getScan()); + private final CombinableMetric scanMetrics; + + public TableResultIterator(StatementContext context, TableRef tableRef, CombinableMetric scanMetrics) throws SQLException { + this(context, tableRef, context.getScan(), scanMetrics); } /* @@ -62,7 +64,7 @@ public class TableResultIterator extends ExplainTable implements ResultIterator delegate = this.delegate; if (delegate == null) { try { - this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan)); + this.delegate = delegate = isClosing ? ResultIterator.EMPTY_ITERATOR : new ScanningResultIterator(htable.getScanner(scan), scanMetrics); } catch (IOException e) { Closeables.closeQuietly(htable); throw ServerUtil.parseServerException(e); @@ -73,13 +75,14 @@ public class TableResultIterator extends ExplainTable implements ResultIterator return delegate; } - public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan) throws SQLException { - this(context, tableRef, scan, ScannerCreation.IMMEDIATE); + public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics) throws SQLException { + this(context, tableRef, scan, scanMetrics, ScannerCreation.IMMEDIATE); } - public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, ScannerCreation creationMode) throws SQLException { + public TableResultIterator(StatementContext context, TableRef tableRef, Scan scan, CombinableMetric scanMetrics, ScannerCreation creationMode) throws SQLException { super(context, tableRef); this.scan = scan; + this.scanMetrics = scanMetrics; htable = context.getConnection().getQueryServices().getTable(tableRef.getTable().getPhysicalName().getBytes()); if (creationMode == ScannerCreation.IMMEDIATE) { getDelegate(false); http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java index b7c8b21..2296982 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UnionResultIterators.java @@ -22,6 +22,9 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.util.ServerUtil; @@ -39,14 +42,22 @@ public class UnionResultIterators implements ResultIterators { private final List<List<Scan>> scans; private final List<PeekingResultIterator> iterators; private final List<QueryPlan> plans; - - public UnionResultIterators(List<QueryPlan> plans) throws SQLException { + private final List<ReadMetricQueue> readMetricsList; + private final List<OverAllQueryMetrics> overAllQueryMetricsList; + private boolean closed; + private final StatementContext parentStmtCtx; + public UnionResultIterators(List<QueryPlan> plans, StatementContext parentStmtCtx) throws SQLException { + this.parentStmtCtx = parentStmtCtx; this.plans = plans; int nPlans = plans.size(); iterators = Lists.newArrayListWithExpectedSize(nPlans); splits = Lists.newArrayListWithExpectedSize(nPlans * 30); scans = Lists.newArrayListWithExpectedSize(nPlans * 10); + readMetricsList = Lists.newArrayListWithCapacity(nPlans); + overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans); for (QueryPlan plan : this.plans) { + readMetricsList.add(plan.getContext().getReadMetricsQueue()); + overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics()); iterators.add(LookAheadResultIterator.wrap(plan.iterator())); splits.addAll(plan.getSplits()); scans.addAll(plan.getScans()); @@ -59,32 +70,47 @@ public class UnionResultIterators implements ResultIterators { } @Override - public void close() throws SQLException { - SQLException toThrow = null; - try { - if (iterators != null) { - for (int index=0; index < iterators.size(); index++) { - PeekingResultIterator iterator = iterators.get(index); - try { - iterator.close(); - } catch (Exception e) { - if (toThrow == null) { - toThrow = ServerUtil.parseServerException(e); - } else { - toThrow.setNextException(ServerUtil.parseServerException(e)); + public void close() throws SQLException { + if (!closed) { + closed = true; + SQLException toThrow = null; + try { + if (iterators != null) { + for (int index=0; index < iterators.size(); index++) { + PeekingResultIterator iterator = iterators.get(index); + try { + iterator.close(); + } catch (Exception e) { + if (toThrow == null) { + toThrow = ServerUtil.parseServerException(e); + } else { + toThrow.setNextException(ServerUtil.parseServerException(e)); + } } } } - } - } catch (Exception e) { - toThrow = ServerUtil.parseServerException(e); - } finally { - if (toThrow != null) { - throw toThrow; + } catch (Exception e) { + toThrow = ServerUtil.parseServerException(e); + } finally { + setMetricsInParentContext(); + if (toThrow != null) { + throw toThrow; + } } } } - + + private void setMetricsInParentContext() { + ReadMetricQueue parentCtxReadMetrics = parentStmtCtx.getReadMetricsQueue(); + for (ReadMetricQueue readMetrics : readMetricsList) { + parentCtxReadMetrics.combineReadMetrics(readMetrics); + } + OverAllQueryMetrics parentCtxQueryMetrics = parentStmtCtx.getOverallQueryMetrics(); + for (OverAllQueryMetrics metric : overAllQueryMetricsList) { + parentCtxQueryMetrics.combine(metric); + } + } + @Override public List<List<Scan>> getScans() { return scans; http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java index dad60c1..5805999 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java @@ -123,7 +123,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private final Properties info; private List<SQLCloseable> statements = new ArrayList<SQLCloseable>(); private final Map<PDataType<?>, Format> formatters = new HashMap<>(); - private MutationState mutationState; + private final MutationState mutationState; private final int mutateBatchSize; private final Long scn; private boolean isAutoCommit = false; @@ -137,9 +137,9 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd private boolean isClosed = false; private Sampler<?> sampler; private boolean readOnly = false; - private Map<String, String> customTracingAnnotations = emptyMap(); private Consistency consistency = Consistency.STRONG; - + private Map<String, String> customTracingAnnotations = emptyMap(); + private final boolean isRequestLevelMetricsEnabled; static { Tracing.addTraceMetricsSource(); } @@ -237,6 +237,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd ! Objects.equal(tenantId, function.getTenantId())); } }; + this.isRequestLevelMetricsEnabled = JDBCUtil.isCollectingRequestLevelMetricsEnabled(url, info, this.services.getProps()); this.mutationState = newMutationState(maxSize); this.metaData = metaData.pruneTables(pruner); this.metaData = metaData.pruneFunctions(pruner); @@ -438,6 +439,7 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd return; } try { + clearMetrics(); try { if (traceScope != null) { traceScope.close(); @@ -866,4 +868,23 @@ public class PhoenixConnection implements Connection, org.apache.phoenix.jdbc.Jd public void setTraceScope(TraceScope traceScope) { this.traceScope = traceScope; } + + public Map<String, Map<String, Long>> getMutationMetrics() { + return mutationState.getMutationMetricQueue().aggregate(); + } + + public Map<String, Map<String, Long>> getReadMetrics() { + return mutationState.getReadMetricQueue() != null ? mutationState.getReadMetricQueue().aggregate() : Collections.<String, Map<String, Long>>emptyMap(); + } + + public boolean isRequestLevelMetricsEnabled() { + return isRequestLevelMetricsEnabled; + } + + public void clearMetrics() { + mutationState.getMutationMetricQueue().clearMetrics(); + if (mutationState.getReadMetricQueue() != null) { + mutationState.getReadMetricQueue().clearMetrics(); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index d1b3b27..2dd8af4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.ExpressionProjector; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -311,7 +312,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public static final int CLIENT_KEY_VALUE_BUILDER_THRESHOLD = VersionUtil.encodeVersion("0", "94", "14"); PhoenixDatabaseMetaData(PhoenixConnection connection) throws SQLException { - this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new PhoenixStatement(connection)); + this.emptyResultSet = new PhoenixResultSet(ResultIterator.EMPTY_ITERATOR, RowProjector.EMPTY_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); this.connection = connection; } @@ -509,11 +510,10 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho public PhoenixStatement newStatement(PhoenixConnection connection) { return new PhoenixStatement(connection) { @Override - protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) - throws SQLException { - return new PhoenixResultSet( - new TenantColumnFilteringIterator(iterator, projector), - projector, this); + protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, + StatementContext context) throws SQLException { + return new PhoenixResultSet(new TenantColumnFilteringIterator(iterator, projector), + projector, context); } }; } @@ -523,7 +523,12 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho } return stmt.executeQuery(buf.toString()); } - + +// private ColumnResolver getColumnResolverForCatalogTable() throws SQLException { +// TableRef tableRef = new TableRef(getTable(connection, SYSTEM_CATALOG_NAME)); +// return FromCompiler.getResolver(tableRef); +// } + /** * Filters the tenant id column out of a column metadata result set (thus, where each row is a column definition). * The tenant id is by definition the first column of the primary key, but the primary key does not necessarily @@ -1007,7 +1012,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData, org.apache.pho } @Override public ResultSet getTableTypes() throws SQLException { - return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new PhoenixStatement(connection)); + return new PhoenixResultSet(new MaterializedResultIterator(TABLE_TYPE_TUPLES), TABLE_TYPE_ROW_PROJECTOR, new StatementContext(new PhoenixStatement(connection), false)); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java index 8ee56ea..da06370 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixResultSet.java @@ -39,16 +39,21 @@ import java.sql.Time; import java.sql.Timestamp; import java.text.Format; import java.util.Calendar; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.RowProjector; +import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.monitoring.OverAllQueryMetrics; +import org.apache.phoenix.monitoring.ReadMetricQueue; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PBoolean; @@ -109,18 +114,25 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho private final ResultIterator scanner; private final RowProjector rowProjector; private final PhoenixStatement statement; + private final StatementContext context; + private final ReadMetricQueue readMetricsQueue; + private final OverAllQueryMetrics overAllQueryMetrics; private final ImmutableBytesWritable ptr = new ImmutableBytesWritable(); private Tuple currentRow = BEFORE_FIRST; private boolean isClosed = false; private boolean wasNull = false; - - public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, PhoenixStatement statement) throws SQLException { + private boolean firstRecordRead = false; + + public PhoenixResultSet(ResultIterator resultIterator, RowProjector rowProjector, StatementContext ctx) throws SQLException { this.rowProjector = rowProjector; this.scanner = resultIterator; - this.statement = statement; + this.context = ctx; + this.statement = context.getStatement(); + this.readMetricsQueue = context.getReadMetricsQueue(); + this.overAllQueryMetrics = context.getOverallQueryMetrics(); } - + @Override public boolean absolute(int row) throws SQLException { throw new SQLFeatureNotSupportedException(); @@ -147,14 +159,14 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho @Override public void close() throws SQLException { - if (isClosed) { - return; - } + if (isClosed) { return; } try { scanner.close(); } finally { isClosed = true; statement.getResultSets().remove(this); + overAllQueryMetrics.endQuery(); + overAllQueryMetrics.stopResultSetWatch(); } } @@ -754,6 +766,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho public boolean next() throws SQLException { checkOpen(); try { + if (!firstRecordRead) { + firstRecordRead = true; + overAllQueryMetrics.startResultSetWatch(); + } currentRow = scanner.next(); rowProjector.reset(); } catch (RuntimeException e) { @@ -764,6 +780,10 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho } throw e; } + if (currentRow == null) { + overAllQueryMetrics.endQuery(); + overAllQueryMetrics.stopResultSetWatch(); + } return currentRow != null; } @@ -1261,4 +1281,18 @@ public class PhoenixResultSet implements ResultSet, SQLCloseable, org.apache.pho public ResultIterator getUnderlyingIterator() { return scanner; } + + public Map<String, Map<String, Long>> getReadMetrics() { + return readMetricsQueue.aggregate(); + } + + public Map<String, Long> getOverAllRequestReadMetrics() { + return overAllQueryMetrics.publish(); + } + + public void resetMetrics() { + readMetricsQueue.clearMetrics(); + overAllQueryMetrics.reset(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 7c94d62..c6c5b0c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -17,9 +17,9 @@ */ package org.apache.phoenix.jdbc; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.MUTATION_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.QUERY_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.QUERY_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER; import java.io.IOException; import java.io.Reader; @@ -216,8 +216,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho return resultSets; } - protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector) throws SQLException { - return new PhoenixResultSet(iterator, projector, this); + protected PhoenixResultSet newResultSet(ResultIterator iterator, RowProjector projector, StatementContext context) throws SQLException { + return new PhoenixResultSet(iterator, projector, context); } protected boolean execute(final CompilableStatement stmt) throws SQLException { @@ -235,7 +235,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho } protected PhoenixResultSet executeQuery(final CompilableStatement stmt) throws SQLException { - QUERY_COUNT.increment(); + GLOBAL_SELECT_SQL_COUNTER.increment(); try { return CallRunner.run( new CallRunner.CallableThrowable<PhoenixResultSet, SQLException>() { @@ -253,7 +253,9 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho String explainPlan = QueryUtil.getExplainPlan(resultIterator); logger.debug(LogUtil.addCustomAnnotations("Explain plan: " + explainPlan, connection)); } - PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector()); + StatementContext context = plan.getContext(); + context.getOverallQueryMetrics().startQuery(); + PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context); resultSets.add(rs); setLastQueryPlan(plan); setLastResultSet(rs); @@ -272,7 +274,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho // Regardless of whether the query was successfully handled or not, // update the time spent so far. If needed, we can separate out the // success times and failure times. - QUERY_TIME.update(System.currentTimeMillis() - startTime); + GLOBAL_QUERY_TIME.update(System.currentTimeMillis() - startTime); } } }, PhoenixContextExecutor.inContext()); @@ -288,7 +290,7 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho SQLExceptionCode.READ_ONLY_CONNECTION). build().buildException(); } - MUTATION_COUNT.increment(); + GLOBAL_MUTATION_SQL_COUNTER.increment(); try { return CallRunner .run( http://git-wip-us.apache.org/repos/asf/phoenix/blob/0f6595c0/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java index 31ef742..7406e46 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/job/JobManager.java @@ -17,11 +17,11 @@ */ package org.apache.phoenix.job; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.REJECTED_TASK_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.CountMetric.TASK_COUNT; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_END_TO_END_TIME; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_EXECUTION_TIME; -import static org.apache.phoenix.monitoring.PhoenixMetrics.SizeMetric.TASK_QUEUE_WAIT_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_REJECTED_TASK_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTED_COUNTER; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_QUEUE_WAIT_TIME; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -36,6 +36,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; + +import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder; + import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * @@ -63,6 +67,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { public static interface JobRunnable<T> extends Runnable { public Object getJobId(); + public TaskExecutionMetricsHolder getTaskExecutionMetric(); } public static ThreadPoolExecutor createThreadPoolExec(int keepAliveMs, int size, int queueSize, boolean useInstrumentedThreadPool) { @@ -117,13 +122,17 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { */ static class JobFutureTask<T> extends FutureTask<T> { private final Object jobId; + @Nullable + private final TaskExecutionMetricsHolder taskMetric; public JobFutureTask(Runnable r, T t) { super(r, t); if(r instanceof JobRunnable){ this.jobId = ((JobRunnable)r).getJobId(); + this.taskMetric = ((JobRunnable)r).getTaskExecutionMetric(); } else { this.jobId = this; + this.taskMetric = null; } } @@ -132,8 +141,10 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { // FIXME: this fails when executor used by hbase if (c instanceof JobCallable) { this.jobId = ((JobCallable<T>) c).getJobId(); + this.taskMetric = ((JobCallable<T>) c).getTaskExecutionMetric(); } else { this.jobId = this; + this.taskMetric = null; } } @@ -187,6 +198,7 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { */ public static interface JobCallable<T> extends Callable<T> { public Object getJobId(); + public TaskExecutionMetricsHolder getTaskExecutionMetric(); } @@ -224,27 +236,40 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { private final RejectedExecutionHandler rejectedExecHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - REJECTED_TASK_COUNT.increment(); + TaskExecutionMetricsHolder metrics = getRequestMetric(r); + if (metrics != null) { + metrics.getNumRejectedTasks().increment(); + } + GLOBAL_REJECTED_TASK_COUNTER.increment(); throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString()); } }; - public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, - BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { + public InstrumentedThreadPoolExecutor(String threadPoolName, int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); setRejectedExecutionHandler(rejectedExecHandler); } @Override public void execute(Runnable task) { - TASK_COUNT.increment(); + TaskExecutionMetricsHolder metrics = getRequestMetric(task); + if (metrics != null) { + metrics.getNumTasks().increment(); + } + GLOBAL_TASK_EXECUTED_COUNTER.increment(); super.execute(task); } @Override protected void beforeExecute(Thread worker, Runnable task) { InstrumentedJobFutureTask instrumentedTask = (InstrumentedJobFutureTask)task; - TASK_QUEUE_WAIT_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime()); + long queueWaitTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime(); + GLOBAL_TASK_QUEUE_WAIT_TIME.update(queueWaitTime); + TaskExecutionMetricsHolder metrics = getRequestMetric(task); + if (metrics != null) { + metrics.getTaskQueueWaitTime().change(queueWaitTime); + } super.beforeExecute(worker, instrumentedTask); } @@ -254,10 +279,21 @@ public class JobManager<T> extends AbstractRoundRobinQueue<T> { try { super.afterExecute(instrumentedTask, t); } finally { - TASK_EXECUTION_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime()); - TASK_END_TO_END_TIME.update(System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime()); + long taskExecutionTime = System.currentTimeMillis() - instrumentedTask.getTaskExecutionStartTime(); + long endToEndTaskTime = System.currentTimeMillis() - instrumentedTask.getTaskSubmissionTime(); + TaskExecutionMetricsHolder metrics = getRequestMetric(task); + if (metrics != null) { + metrics.getTaskExecutionTime().change(taskExecutionTime); + metrics.getTaskEndToEndTime().change(endToEndTaskTime); + } + GLOBAL_TASK_EXECUTION_TIME.update(taskExecutionTime); + GLOBAL_TASK_END_TO_END_TIME.update(endToEndTaskTime); } } + + private static TaskExecutionMetricsHolder getRequestMetric(Runnable task) { + return ((JobFutureTask)task).taskMetric; + } } }