Repository: phoenix Updated Branches: refs/heads/master 6b0461002 -> 3acedb726
Added custom annotations to logs in query-related functionality in client- and server-side code for PHOENIX-1198 Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/3acedb72 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/3acedb72 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/3acedb72 Branch: refs/heads/master Commit: 3acedb7262dbf5241e443a2769565e6750b4ac14 Parents: 6b04610 Author: Eli Levine <elilev...@apache.org> Authored: Fri Sep 26 10:47:33 2014 -0700 Committer: Eli Levine <elilev...@apache.org> Committed: Fri Sep 26 10:47:33 2014 -0700 ---------------------------------------------------------------------- .../apache/phoenix/cache/ServerCacheClient.java | 18 +++------ .../apache/phoenix/compile/FromCompiler.java | 3 +- .../coprocessor/BaseScannerRegionObserver.java | 5 +++ .../GroupedAggregateRegionObserver.java | 39 +++++++++++--------- .../UngroupedAggregateRegionObserver.java | 7 ++-- .../apache/phoenix/execute/BaseQueryPlan.java | 15 ++++++++ .../apache/phoenix/execute/HashJoinPlan.java | 5 +-- .../apache/phoenix/execute/MutationState.java | 12 +++--- .../phoenix/iterate/ChunkedResultIterator.java | 14 +++++-- .../phoenix/iterate/ConcatResultIterator.java | 5 +++ .../DefaultParallelIteratorRegionSplitter.java | 3 +- .../DistinctAggregatingResultIterator.java | 7 ++++ .../FilterAggregatingResultIterator.java | 6 +++ .../phoenix/iterate/FilterResultIterator.java | 6 +++ .../GroupedAggregatingResultIterator.java | 7 ++++ .../phoenix/iterate/LimitingResultIterator.java | 5 +++ .../iterate/MergeSortRowKeyResultIterator.java | 6 +++ .../iterate/MergeSortTopNResultIterator.java | 7 ++++ .../phoenix/iterate/OrderedResultIterator.java | 10 +++++ .../phoenix/iterate/ParallelIterators.java | 8 +++- .../iterate/RegionScannerResultIterator.java | 5 +++ .../phoenix/iterate/ScanningResultIterator.java | 5 +++ .../phoenix/iterate/SequenceResultIterator.java | 6 +++ .../phoenix/iterate/TableResultIterator.java | 6 +++ .../UngroupedAggregatingResultIterator.java | 6 +++ .../query/ConnectionQueryServicesImpl.java | 1 - .../apache/phoenix/schema/MetaDataClient.java | 7 ++-- .../org/apache/phoenix/trace/TraceReader.java | 2 +- .../apache/phoenix/trace/TracingIterator.java | 5 +++ .../java/org/apache/phoenix/util/LogUtil.java | 27 ++++++++++++-- .../java/org/apache/phoenix/util/ScanUtil.java | 10 +++++ .../org/apache/phoenix/util/LogUtilTest.java | 27 +++++++++++--- 32 files changed, 231 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java index 1301fb7..fa19881 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java @@ -167,7 +167,7 @@ public class ServerCacheClient { if ( ! servers.contains(entry) && keyRanges.intersect(entry.getRegionInfo().getStartKey(), entry.getRegionInfo().getEndKey())) { // Call RPC once per server servers.add(entry); - if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection.getCustomTracingAnnotations()));} + if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Adding cache entry to be sent for " + entry, connection));} final byte[] key = entry.getRegionInfo().getStartKey(); final HTableInterface htable = services.getTable(cacheUsingTableRef.getTable().getPhysicalName().getBytes()); closeables.add(htable); @@ -220,7 +220,7 @@ public class ServerCacheClient { } })); } else { - if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection.getCustomTracingAnnotations()));} + if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", connection));} } } @@ -259,7 +259,7 @@ public class ServerCacheClient { } } } - if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Cache " + cacheId + " successfully added to servers.", connection.getCustomTracingAnnotations()));} + if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Cache " + cacheId + " successfully added to servers.", connection));} return hashCacheSpec; } @@ -285,7 +285,7 @@ public class ServerCacheClient { * this, we iterate through the current metadata boundaries and remove the cache once for each * server that we originally sent to. */ - if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection.getCustomTracingAnnotations()));} + if (LOG.isDebugEnabled()) {LOG.debug(addCustomAnnotations("Removing Cache " + cacheId + " from servers.", connection));} for (HRegionLocation entry : locations) { if (remainingOnServers.contains(entry)) { // Call once per server try { @@ -313,19 +313,13 @@ public class ServerCacheClient { } catch (Throwable t) { lastThrowable = t; Map<String, String> customAnnotations = emptyMap(); - if (connection != null) { - customAnnotations = connection.getCustomTracingAnnotations(); - } - LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, customAnnotations), t); + LOG.error(addCustomAnnotations("Error trying to remove hash cache for " + entry, connection), t); } } } if (!remainingOnServers.isEmpty()) { Map<String, String> customAnnotations = emptyMap(); - if (connection != null) { - customAnnotations = connection.getCustomTracingAnnotations(); - } - LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, customAnnotations), lastThrowable); + LOG.warn(addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, connection), lastThrowable); } } finally { Closeables.closeQuietly(iterateOverTable); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java index 8336f3e..5ee29e2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java @@ -68,6 +68,7 @@ import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.util.Closeables; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -320,7 +321,7 @@ public class FromCompiler { theTable = addDynamicColumns(dynamicColumns, theTable); TableRef tableRef = new TableRef(tableNode.getAlias(), theTable, timeStamp, !dynamicColumns.isEmpty()); if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) { - logger.debug("Re-resolved stale table " + fullTableName + " with seqNum " + tableRef.getTable().getSequenceNumber() + " at timestamp " + tableRef.getTable().getTimeStamp() + " with " + tableRef.getTable().getColumns().size() + " columns: " + tableRef.getTable().getColumns()); + logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale table " + fullTableName + " with seqNum " + tableRef.getTable().getSequenceNumber() + " at timestamp " + tableRef.getTable().getTimeStamp() + " with " + tableRef.getTable().getColumns().size() + " columns: " + tableRef.getTable().getColumns(), connection)); } return tableRef; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 1129eef..07b1495 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -61,6 +61,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String EXPECTED_UPPER_REGION_KEY = "_ExpectedUpperRegionKey"; public static final String REVERSE_SCAN = "_ReverseScan"; public static final String ANALYZE_TABLE = "_ANALYZETABLE"; + /** + * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations + * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198. + */ + public static final String CUSTOM_ANNOTATIONS = "_Annot"; /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index c352e9a..8add152 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -69,6 +69,7 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.util.Closeables; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TupleUtil; @@ -235,10 +236,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private final Map<ImmutableBytesPtr, Aggregator[]> aggregateMap; private final ServerAggregators aggregators; private final RegionCoprocessorEnvironment env; + private final byte[] customAnnotations; private int estDistVals; - InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, ServerAggregators aggregators, int estDistVals) { + InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { int estValueSize = aggregators.getEstimatedByteSize(); long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize); TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId); @@ -247,6 +249,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { this.aggregators = aggregators; this.aggregateMap = Maps.newHashMapWithExpectedSize(estDistVals); this.chunk = tenantCache.getMemoryManager().allocate(estSize); + this.customAnnotations = customAnnotations; } @Override @@ -263,9 +266,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { // value, clone our original one (we need one // per distinct value) if (logger.isDebugEnabled()) { - logger.debug("Adding new aggregate bucket for row key " + logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate bucket for row key " + Bytes.toStringBinary(key.get(), key.getOffset(), - key.getLength())); + key.getLength()), customAnnotations)); } rowAggregators = aggregators.newAggregators(env.getConfiguration()); @@ -298,10 +301,10 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { byte[] value = aggregators.toBytes(rowAggregators); if (logger.isDebugEnabled()) { - logger.debug("Adding new distinct group: " + logger.debug(LogUtil.addCustomAnnotations("Adding new distinct group: " + Bytes.toStringBinary(key.get(), key.getOffset(), key.getLength()) + " with aggregators " + Arrays.asList(rowAggregators).toString() - + " value = " + Bytes.toStringBinary(value)); + + " value = " + Bytes.toStringBinary(value), customAnnotations)); } KeyValue keyValue = KeyValueUtil.newKeyValue(key.get(), key.getOffset(), key.getLength(), @@ -354,7 +357,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private GroupByCacheFactory() { } - GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, ServerAggregators aggregators, int estDistVals) { + GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { Configuration conf = env.getConfiguration(); boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); @@ -362,7 +365,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals); } - return new InMemoryGroupByCache(env, tenantId, aggregators, estDistVals); + return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals); } } /** @@ -378,8 +381,8 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { List<IndexMaintainer> indexMaintainers, HRegion dataRegion, byte[][] viewConstants) throws IOException { if (logger.isDebugEnabled()) { - logger.debug("Grouped aggregation over unordered rows with scan " + scan - + ", group by " + expressions + ", aggregators " + aggregators); + logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over unordered rows with scan " + scan + + ", group by " + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } RegionCoprocessorEnvironment env = c.getEnvironment(); Configuration conf = env.getConfiguration(); @@ -396,7 +399,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache( - env, ScanUtil.getTenantId(scan), + env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals); ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); boolean success = false; @@ -405,7 +408,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { MultiKeyValueTuple result = new MultiKeyValueTuple(); if (logger.isDebugEnabled()) { - logger.debug("Spillable groupby enabled: " + spillableEnabled); + logger.debug(LogUtil.addCustomAnnotations("Spillable groupby enabled: " + spillableEnabled, ScanUtil.getCustomAnnotations(scan))); } HRegion region = c.getEnvironment().getRegion(); @@ -459,15 +462,15 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { * @throws IOException */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, - Scan scan, final RegionScanner s, final List<Expression> expressions, + final Scan scan, final RegionScanner s, final List<Expression> expressions, final ServerAggregators aggregators, final long limit, final int offset, final boolean localIndexScan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final List<IndexMaintainer> indexMaintainers, final HRegion dataRegion, final byte[][] viewConstants) throws IOException { if (logger.isDebugEnabled()) { - logger.debug("Grouped aggregation over ordered rows with scan " + scan + ", group by " - + expressions + ", aggregators " + aggregators); + logger.debug(LogUtil.addCustomAnnotations("Grouped aggregation over ordered rows with scan " + scan + ", group by " + + expressions + ", aggregators " + aggregators, ScanUtil.getCustomAnnotations(scan))); } final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); return new BaseRegionScanner() { @@ -518,9 +521,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { if (!aggBoundary) { aggregators.aggregate(rowAggregators, result); if (logger.isDebugEnabled()) { - logger.debug("Row passed filters: " + kvs + logger.debug(LogUtil.addCustomAnnotations("Row passed filters: " + kvs + ", aggregated values: " - + Arrays.asList(rowAggregators)); + + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan))); } currentKey = key; } @@ -541,12 +544,12 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { AGG_TIMESTAMP, value, 0, value.length); results.add(keyValue); if (logger.isDebugEnabled()) { - logger.debug("Adding new aggregate row: " + logger.debug(LogUtil.addCustomAnnotations("Adding new aggregate row: " + keyValue + ",for current key " + Bytes.toStringBinary(currentKey.get(), currentKey.getOffset(), currentKey.getLength()) + ", aggregated values: " - + Arrays.asList(rowAggregators)); + + Arrays.asList(rowAggregators), ScanUtil.getCustomAnnotations(scan))); } // If we're at an aggregation boundary, reset the // aggregators and http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d39f868..dae7465 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -87,6 +87,7 @@ import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -233,7 +234,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ boolean hasAny = false; MultiKeyValueTuple result = new MultiKeyValueTuple(); if (logger.isInfoEnabled()) { - logger.info("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo()); + logger.info(LogUtil.addCustomAnnotations("Starting ungrouped coprocessor scan " + scan + " "+region.getRegionInfo(), ScanUtil.getCustomAnnotations(scan))); } long rowCount = 0; region.startRegionOperation(); @@ -359,7 +360,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } catch (ConstraintViolationException e) { // Log and ignore in count - logger.error("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), e); + logger.error(LogUtil.addCustomAnnotations("Failed to create row in " + region.getRegionNameAsString() + " with values " + SchemaUtil.toString(values), ScanUtil.getCustomAnnotations(scan)), e); continue; } aggregators.aggregate(rowAggregators, result); @@ -379,7 +380,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } if (logger.isInfoEnabled()) { - logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan); + logger.info(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); } if (!mutations.isEmpty()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index d4c119b..f5f130f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -26,6 +26,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -60,6 +62,7 @@ import org.apache.phoenix.trace.TracingIterator; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; @@ -77,6 +80,7 @@ import com.google.common.collect.Lists; * @since 0.1 */ public abstract class BaseQueryPlan implements QueryPlan { + private static final Log LOG = LogFactory.getLog(BaseQueryPlan.class); protected static final long DEFAULT_ESTIMATED_SIZE = 10 * 1024; // 10 K protected final TableRef tableRef; @@ -184,6 +188,8 @@ public abstract class BaseQueryPlan implements QueryPlan { ScanUtil.setTimeRange(scan, context.getScanTimeRange()); } ScanUtil.setTenantId(scan, connection.getTenantId() == null ? null : connection.getTenantId().getBytes()); + String customAnnotations = LogUtil.customAnnotationsToString(connection); + ScanUtil.setCustomAnnotations(scan, customAnnotations == null ? null : customAnnotations.getBytes()); // Set local index related scan attributes. if (context.getCurrentTable().getTable().getIndexType() == IndexType.LOCAL) { ScanUtil.setLocalIndex(scan); @@ -213,6 +219,11 @@ public abstract class BaseQueryPlan implements QueryPlan { serializeViewConstantsIntoScan(scan, dataTable); } } + + if (LOG.isDebugEnabled()) { + LOG.debug(LogUtil.addCustomAnnotations("Scan ready for iteration: " + scan, connection)); + } + ResultIterator iterator = newIterator(); iterator = dependencies.isEmpty() ? iterator : new DelegateResultIterator(iterator) { @@ -225,6 +236,10 @@ public abstract class BaseQueryPlan implements QueryPlan { } } }; + + if (LOG.isDebugEnabled()) { + LOG.debug(LogUtil.addCustomAnnotations("Iterator ready: " + iterator, connection)); + } // wrap the iterator so we start/end tracing as we expect TraceScope scope = http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java index 0d09f79..c805b7e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java @@ -23,7 +23,6 @@ import java.sql.ParameterMetaData; import java.sql.SQLException; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -93,7 +92,6 @@ public class HashJoinPlan implements QueryPlan { private HashCacheClient hashClient; private int maxServerCacheTimeToLive; private AtomicLong firstJobEndTime; - private Map<String, String> customAnnotations; private List<Expression> keyRangeExpressions; public static HashJoinPlan create(FilterableStatement statement, @@ -153,7 +151,6 @@ public class HashJoinPlan implements QueryPlan { hashClient = new HashCacheClient(plan.getContext().getConnection()); maxServerCacheTimeToLive = services.getProps().getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS); firstJobEndTime = new AtomicLong(0); - customAnnotations = connection.getCustomTracingAnnotations(); keyRangeExpressions = new CopyOnWriteArrayList<Expression>(); } @@ -429,7 +426,7 @@ public class HashJoinPlan implements QueryPlan { long endTime = System.currentTimeMillis(); boolean isSet = parent.firstJobEndTime.compareAndSet(0, endTime); if (!isSet && (endTime - parent.firstJobEndTime.get()) > parent.maxServerCacheTimeToLive) { - LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.customAnnotations)); + LOG.warn(addCustomAnnotations("Hash plan [" + index + "] execution seems too slow. Earlier hash cache(s) might have expired on servers.", parent.plan.context.getConnection())); } if (keyRangeRhsValues != null) { parent.keyRangeExpressions.add(parent.createKeyRangeExpression(keyRangeLhsExpression, keyRangeRhsExpression, keyRangeRhsValues, plan.getContext().getTempPtr(), hasFilters)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 8972650..24fef10 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -51,6 +51,7 @@ import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.ServerUtil; @@ -275,7 +276,6 @@ public class MutationState implements SQLCloseable { private long[] validate() throws SQLException { int i = 0; Long scn = connection.getSCN(); - PName tenantId = connection.getTenantId(); MetaDataClient client = new MetaDataClient(connection); long[] timeStamps = new long[this.mutations.size()]; for (Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry : mutations.entrySet()) { @@ -315,7 +315,7 @@ public class MutationState implements SQLCloseable { return timeStamps; } - private static void logMutationSize(HTableInterface htable, List<Mutation> mutations) { + private static void logMutationSize(HTableInterface htable, List<Mutation> mutations, PhoenixConnection connection) { long byteSize = 0; int keyValueCount = 0; for (Mutation mutation : mutations) { @@ -330,7 +330,7 @@ public class MutationState implements SQLCloseable { } } } - logger.debug("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes"); + logger.debug(LogUtil.addCustomAnnotations("Sending " + mutations.size() + " mutations for " + Bytes.toString(htable.getTableName()) + " with " + keyValueCount + " key values of total size " + byteSize + " bytes", connection)); } @SuppressWarnings("deprecation") @@ -399,13 +399,13 @@ public class MutationState implements SQLCloseable { SQLException sqlE = null; HTableInterface hTable = connection.getQueryServices().getTable(htableName); try { - if (logger.isDebugEnabled()) logMutationSize(hTable, mutations); + if (logger.isDebugEnabled()) logMutationSize(hTable, mutations, connection); long startTime = System.currentTimeMillis(); child.addTimelineAnnotation("Attempt " + retryCount); hTable.batch(mutations); child.stop(); shouldRetry = false; - if (logger.isDebugEnabled()) logger.debug("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms"); + if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Total time for batch call of " + mutations.size() + " mutations into " + table.getName().getString() + ": " + (System.currentTimeMillis() - startTime) + " ms", connection)); committedList.add(entry); } catch (Exception e) { SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e); @@ -415,7 +415,7 @@ public class MutationState implements SQLCloseable { // and one of the region servers doesn't have it. This will cause it to have it the next go around. // If it fails again, we don't retry. String msg = "Swallowing exception and retrying after clearing meta cache on connection. " + inferredE; - logger.warn(msg); + logger.warn(LogUtil.addCustomAnnotations(msg, connection)); connection.getQueryServices().clearTableRegionCache(htableName); // add a new child span as this one failed http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java index c702e99..4a62259 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java @@ -30,6 +30,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ScanUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +66,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { @Override public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { scanner.close(); //close the iterator since we don't need it anymore. - if (logger.isDebugEnabled()) logger.debug("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan); + if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); return new ChunkedResultIterator(delegateFactory, context, tableRef, scan, context.getConnection().getQueryServices().getProps().getLong( QueryServices.SCAN_RESULT_CHUNK_SIZE, @@ -83,7 +84,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { // Instantiate single chunk iterator and the delegate iterator in constructor // to get parallel scans kicked off in separate threads. If we delay this, // we'll get serialized behavior (see PHOENIX- - if (logger.isDebugEnabled()) logger.debug("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan); + if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); @@ -114,7 +115,7 @@ public class ChunkedResultIterator implements PeekingResultIterator { resultIterator.close(); scan = ScanUtil.newScan(scan); scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey)); - if (logger.isDebugEnabled()) logger.debug("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan); + if (logger.isDebugEnabled()) logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator over " + tableRef.getTable().getName().getString() + " with " + scan, ScanUtil.getCustomAnnotations(scan))); ResultIterator singleChunkResultIterator = new SingleChunkResultIterator( new TableResultIterator(context, tableRef, scan), chunkSize); resultIterator = delegateIteratorFactory.newIterator(context, singleChunkResultIterator, scan); @@ -180,5 +181,12 @@ public class ChunkedResultIterator implements PeekingResultIterator { return Bytes.compareTo(currentKey, offset, length, lastKey.get(), lastKey.getOffset(), lastKey.getLength()) != 0; } + + @Override + public String toString() { + return "SingleChunkResultIterator [rowCount=" + rowCount + + ", chunkComplete=" + chunkComplete + ", delegate=" + + delegate + ", chunkSize=" + chunkSize + "]"; + } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java index 21ae7d8..f273fdf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ConcatResultIterator.java @@ -86,4 +86,9 @@ public class ConcatResultIterator implements PeekingResultIterator { return currentIterator().next(); } + @Override + public String toString() { + return "ConcatResultIterator [resultIterators=" + resultIterators + + ", iterators=" + iterators + ", index=" + index + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java index a0ac20c..8fb85ae 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultParallelIteratorRegionSplitter.java @@ -32,6 +32,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.SchemaUtil; @@ -163,7 +164,7 @@ public class DefaultParallelIteratorRegionSplitter implements ParallelIteratorRe } } if (logger.isDebugEnabled()) { - logger.debug("The captured guideposts are: " + guidePosts); + logger.debug(LogUtil.addCustomAnnotations("The captured guideposts are: " + guidePosts, ScanUtil.getCustomAnnotations(scan))); } return guidePosts; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java index 1fb54b5..1ba134b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DistinctAggregatingResultIterator.java @@ -158,4 +158,11 @@ public class DistinctAggregatingResultIterator implements AggregatingResultItera public void aggregate(Tuple result) { delegate.aggregate(result); } + + @Override + public String toString() { + return "DistinctAggregatingResultIterator [delegate=" + delegate + + ", rowProjector=" + rowProjector + ", resultIterator=" + + resultIterator + ", ptr1=" + ptr1 + ", ptr2=" + ptr2 + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java index 98a40a7..15b9094 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterAggregatingResultIterator.java @@ -75,4 +75,10 @@ public class FilterAggregatingResultIterator implements AggregatingResultIterat delegate.explain(planSteps); planSteps.add("CLIENT FILTER BY " + expression.toString()); } + + @Override + public String toString() { + return "FilterAggregatingResultIterator [delegate=" + delegate + + ", expression=" + expression + ", ptr=" + ptr + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java index e825bf5..0c68a20 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/FilterResultIterator.java @@ -73,4 +73,10 @@ public class FilterResultIterator extends LookAheadResultIterator { delegate.explain(planSteps); planSteps.add("CLIENT FILTER BY " + expression.toString()); } + + @Override + public String toString() { + return "FilterResultIterator [delegate=" + delegate + ", expression=" + + expression + ", ptr=" + ptr + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java index 50e1bc2..bb9bf50 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/GroupedAggregatingResultIterator.java @@ -101,4 +101,11 @@ public class GroupedAggregatingResultIterator implements AggregatingResultIterat public void explain(List<String> planSteps) { resultIterator.explain(planSteps); } + + @Override + public String toString() { + return "GroupedAggregatingResultIterator [tempPtr=" + tempPtr + + ", resultIterator=" + resultIterator + ", aggregators=" + + aggregators + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java index f380cf5..2d0ff2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/LimitingResultIterator.java @@ -52,4 +52,9 @@ public class LimitingResultIterator extends DelegateResultIterator { planSteps.add("CLIENT " + limit + " ROW LIMIT"); } + @Override + public String toString() { + return "LimitingResultIterator [rowCount=" + rowCount + ", limit=" + + limit + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java index f2505ce..1da5142 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortRowKeyResultIterator.java @@ -56,4 +56,10 @@ public class MergeSortRowKeyResultIterator extends MergeSortResultIterator { resultIterators.explain(planSteps); planSteps.add("CLIENT MERGE SORT"); } + + @Override + public String toString() { + return "MergeSortRowKeyResultIterator [keyOffset=" + keyOffset + + ", factor=" + factor + "]"; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java index 30f5ee0..64ededa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortTopNResultIterator.java @@ -94,4 +94,11 @@ public class MergeSortTopNResultIterator extends MergeSortResultIterator { planSteps.add(" SERVER" + (limit == -1 ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderByColumns.toString()); planSteps.add("CLIENT MERGE SORT"); } + + @Override + public String toString() { + return "MergeSortTopNResultIterator [limit=" + limit + ", count=" + + count + ", orderByColumns=" + orderByColumns + ", ptr1=" + + ptr1 + ", ptr2=" + ptr2 + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 0d6f6be..d087f80 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -249,4 +249,14 @@ public class OrderedResultIterator implements PeekingResultIterator { delegate.explain(planSteps); planSteps.add("CLIENT" + (limit == null ? "" : " TOP " + limit + " ROW" + (limit == 1 ? "" : "S")) + " SORTED BY " + orderByExpressions.toString()); } + + @Override + public String toString() { + return "OrderedResultIterator [thresholdBytes=" + thresholdBytes + + ", limit=" + limit + ", delegate=" + delegate + + ", orderByExpressions=" + orderByExpressions + + ", estimatedByteSize=" + estimatedByteSize + + ", resultIterator=" + resultIterator + ", byteSize=" + + byteSize + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index a9037b4..a2dabe3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -64,6 +64,7 @@ import org.apache.phoenix.schema.SaltingUtil; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.trace.util.Tracing; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; @@ -361,7 +362,7 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { long startTime = System.currentTimeMillis(); ResultIterator scanner = new TableResultIterator(context, tableRef, splitScan); if (logger.isDebugEnabled()) { - logger.debug("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan); + logger.debug(LogUtil.addCustomAnnotations("Id: " + scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + splitScan, ScanUtil.getCustomAnnotations(splitScan))); } return iteratorFactory.newIterator(context, scanner, splitScan); } @@ -393,4 +394,9 @@ public class ParallelIterators extends ExplainTable implements ResultIterators { buf.append("CLIENT PARALLEL " + size() + "-WAY "); explain(buf.toString(),planSteps); } + + @Override + public String toString() { + return "ParallelIterators [splits=" + splits + "]"; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 9d95f30..bff0936 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -57,4 +57,9 @@ public class RegionScannerResultIterator extends BaseResultIterator { throw ServerUtil.parseServerException(e); } } + + @Override + public String toString() { + return "RegionScannerResultIterator [scanner=" + scanner + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index f475296..8aa9a2d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -56,4 +56,9 @@ public class ScanningResultIterator implements ResultIterator { @Override public void explain(List<String> planSteps) { } + + @Override + public String toString() { + return "ScanningResultIterator [scanner=" + scanner + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/SequenceResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SequenceResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SequenceResultIterator.java index f47295d..80b5401 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SequenceResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SequenceResultIterator.java @@ -54,4 +54,10 @@ public class SequenceResultIterator extends DelegateResultIterator { int nSequences = sequenceManager.getSequenceCount(); planSteps.add("CLIENT RESERVE VALUES FROM " + nSequences + " SEQUENCE" + (nSequences == 1 ? "" : "S")); } + + @Override + public String toString() { + return "SequenceResultIterator [sequenceManager=" + sequenceManager + + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index 97ff563..58abec5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -81,4 +81,10 @@ public class TableResultIterator extends ExplainTable implements ResultIterator StringBuilder buf = new StringBuilder(); explain(buf.toString(),planSteps); } + + @Override + public String toString() { + return "TableResultIterator [htable=" + htable + ", delegate=" + + delegate + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java index f6e3b82..797f3ce 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/UngroupedAggregatingResultIterator.java @@ -51,4 +51,10 @@ public class UngroupedAggregatingResultIterator extends GroupedAggregatingResult hasRows = true; return result; } + + @Override + public String toString() { + return "UngroupedAggregatingResultIterator [hasRows=" + hasRows + + ", aggregators=" + aggregators + "]"; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 25117ad..9c520d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -410,7 +410,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement /** * Ensures that metaData mutations are handled in the correct order - * @param tenantId TODO */ private PMetaData metaDataMutated(PName tenantId, String tableName, long tableSeqNum, Mutator mutator) throws SQLException { synchronized (latestMetaDataLock) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 82eb836..5be8c3e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -138,6 +138,7 @@ import org.apache.phoenix.schema.PTable.LinkType; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.LogUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.SchemaUtil; @@ -1662,7 +1663,7 @@ public class MetaDataClient { case CONCURRENT_TABLE_MUTATION: connection.addTable(result.getTable()); if (logger.isDebugEnabled()) { - logger.debug("CONCURRENT_TABLE_MUTATION for table " + SchemaUtil.getTableName(schemaName, tableName)); + logger.debug(LogUtil.addCustomAnnotations("CONCURRENT_TABLE_MUTATION for table " + SchemaUtil.getTableName(schemaName, tableName), connection)); } throw new ConcurrentTableMutationException(schemaName, tableName); case NEWER_TABLE_FOUND: @@ -1752,7 +1753,7 @@ public class MetaDataClient { ColumnResolver resolver = FromCompiler.getResolver(statement, connection); PTable table = resolver.getTables().get(0).getTable(); if (logger.isDebugEnabled()) { - logger.debug("Resolved table to " + table.getName().getString() + " with seqNum " + table.getSequenceNumber() + " at timestamp " + table.getTimeStamp() + " with " + table.getColumns().size() + " columns: " + table.getColumns()); + logger.debug(LogUtil.addCustomAnnotations("Resolved table to " + table.getName().getString() + " with seqNum " + table.getSequenceNumber() + " at timestamp " + table.getTimeStamp() + " with " + table.getColumns().size() + " columns: " + table.getColumns(), connection)); } int position = table.getColumns().size(); @@ -1972,7 +1973,7 @@ public class MetaDataClient { throw e; } if (logger.isDebugEnabled()) { - logger.debug("Caught ConcurrentTableMutationException for table " + SchemaUtil.getTableName(schemaName, tableName) + ". Will try again..."); + logger.debug(LogUtil.addCustomAnnotations("Caught ConcurrentTableMutationException for table " + SchemaUtil.getTableName(schemaName, tableName) + ". Will try again...", connection)); } retried = true; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java index f27cf5c..3d6eb9b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TraceReader.java @@ -233,7 +233,7 @@ public class TraceReader { private String addCustomAnnotations(String logLine) throws SQLException { if (conn.isWrapperFor(PhoenixConnection.class)) { PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class); - logLine = LogUtil.addCustomAnnotations(logLine, phxConn.getCustomTracingAnnotations()); + logLine = LogUtil.addCustomAnnotations(logLine, phxConn); } return logLine; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/trace/TracingIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/trace/TracingIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/trace/TracingIterator.java index 0b12c43..bee5a1c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/trace/TracingIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/trace/TracingIterator.java @@ -55,4 +55,9 @@ public class TracingIterator extends DelegateResultIterator { } return super.next(); } + + @Override + public String toString() { + return "TracingIterator [scope=" + scope + ", started=" + started + "]"; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/util/LogUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/LogUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/LogUtil.java index 256a260..21dec13 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/LogUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/LogUtil.java @@ -17,18 +17,37 @@ */ package org.apache.phoenix.util; -import java.util.Map; +import javax.annotation.Nullable; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.jdbc.PhoenixConnection; public class LogUtil { private LogUtil() { } - public static String addCustomAnnotations(String logLine, Map<String, String> annotations) { - if (annotations == null || annotations.isEmpty()) { + public static String addCustomAnnotations(@Nullable String logLine, @Nullable PhoenixConnection con) { + if (con == null || con.getCustomTracingAnnotations() == null || con.getCustomTracingAnnotations().isEmpty()) { + return logLine; + } else { + return customAnnotationsToString(con) + ' ' + logLine; + } + } + + public static String addCustomAnnotations(@Nullable String logLine, @Nullable byte[] annotations) { + if (annotations == null) { return logLine; + } else { + return Bytes.toString(annotations) + ' ' + logLine; + } + } + + public static String customAnnotationsToString(@Nullable PhoenixConnection con) { + if (con == null || con.getCustomTracingAnnotations() == null || con.getCustomTracingAnnotations().isEmpty()) { + return null; } else { - return annotations.toString() + ' ' + logLine; + return con.getCustomTracingAnnotations().toString(); } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index daef1c3..fc79173 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS; + import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; @@ -94,6 +96,14 @@ public class ScanUtil { } return new ImmutableBytesWritable(tenantId); } + + public static void setCustomAnnotations(Scan scan, byte[] annotations) { + scan.setAttribute(CUSTOM_ANNOTATIONS, annotations); + } + + public static byte[] getCustomAnnotations(Scan scan) { + return scan.getAttribute(CUSTOM_ANNOTATIONS); + } public static Scan newScan(Scan scan) { try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/3acedb72/phoenix-core/src/test/java/org/apache/phoenix/util/LogUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/LogUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/LogUtilTest.java index ca9c820..e439355 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/LogUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/LogUtilTest.java @@ -19,25 +19,40 @@ package org.apache.phoenix.util; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; -import java.util.Map; - +import org.apache.phoenix.jdbc.PhoenixConnection; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; import com.google.common.collect.ImmutableMap; +@RunWith(MockitoJUnitRunner.class) public class LogUtilTest { + @Mock PhoenixConnection con; + + @Test + public void testAddCustomAnnotationsWithNullConnection() { + String logLine = LogUtil.addCustomAnnotations("log line", (PhoenixConnection)null); + assertEquals(logLine, "log line"); + } + @Test - public void testAddCustomAnnotationsWithNull() { - String logLine = LogUtil.addCustomAnnotations("log line", null); + public void testAddCustomAnnotationsWithNullAnnotations() { + when(con.getCustomTracingAnnotations()).thenReturn(null); + + String logLine = LogUtil.addCustomAnnotations("log line", con); assertEquals(logLine, "log line"); } @Test public void testAddCustomAnnotations() { - Map<String, String> annotations = ImmutableMap.of("a1", "v1", "a2", "v2"); - String logLine = LogUtil.addCustomAnnotations("log line", annotations); + when(con.getCustomTracingAnnotations()).thenReturn(ImmutableMap.of("a1", "v1", "a2", "v2")); + + String logLine = LogUtil.addCustomAnnotations("log line", con); assertTrue(logLine.contains("log line")); assertTrue(logLine.contains("a1=v1")); assertTrue(logLine.contains("a2=v2"));