Improve OperatorStats to avoid leaking state. Fix issue where HashJoinBatch throws NPE in stats tracking if we don't have HashTable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/894037ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/894037ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/894037ab Branch: refs/heads/master Commit: 894037ab693dea425e88fb3ec3aff73ea5b15eb1 Parents: 2899288 Author: Jacques Nadeau <[email protected]> Authored: Tue Jun 17 19:41:15 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Jun 17 20:18:37 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ops/OperatorStats.java | 15 +++++++++------ .../drill/exec/physical/impl/WriterRecordBatch.java | 3 ++- .../exec/physical/impl/aggregate/HashAggBatch.java | 4 ++-- .../physical/impl/aggregate/StreamingAggBatch.java | 3 ++- .../drill/exec/physical/impl/join/HashJoinBatch.java | 13 +++++++------ .../exec/physical/impl/join/MergeJoinBatch.java | 4 ++-- .../drill/exec/record/AbstractRecordBatch.java | 10 +++++++--- 7 files changed, 31 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 7d1e9dc..dcb73c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -66,41 +66,44 @@ public class OperatorStats { this.schemaCountByInput = new long[inputCount]; } + private String assertionError(String msg){ + return String.format("Failure while %s for operator id %d. Currently have states of processing:%s, setup:%s, waiting:%s.", msg, operatorId, inProcessing, inSetup, inWait); + } public void startSetup() { - assert !inSetup : "Failure while starting setup. Currently in setup."; + assert !inSetup : assertionError("starting setup"); stopProcessing(); inSetup = true; setupMark = System.nanoTime(); } public void stopSetup() { - assert inSetup : "Failure while stopping setup. Not currently in setup."; + assert inSetup : assertionError("stopping setup"); startProcessing(); setupNanos += System.nanoTime() - setupMark; inSetup = false; } public void startProcessing() { - assert !inProcessing : "Failure while starting processing. Currently in processing."; + assert !inProcessing : assertionError("starting processing"); processingMark = System.nanoTime(); inProcessing = true; } public void stopProcessing() { - assert inProcessing : "Failure while stopping processing. Not currently in processing."; + assert inProcessing : assertionError("stopping processing"); processingNanos += System.nanoTime() - processingMark; inProcessing = false; } public void startWait() { - assert !inWait : "Failure while starting waiting. Currently in waiting."; + assert !inWait : assertionError("starting waiting"); stopProcessing(); inWait = true; waitMark = System.nanoTime(); } public void stopWait() { - assert inWait : "Failure while stopping waiting. Currently not in waiting."; + assert inWait : assertionError("stopping waiting"); startProcessing(); waitNanos += System.nanoTime() - waitMark; inWait = false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 2dae853..43e0dd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -156,9 +156,10 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { // update the schema in RecordWriter stats.startSetup(); recordWriter.updateSchema(incoming.getSchema()); - stats.stopSetup(); } catch(IOException ex) { throw new RuntimeException("Failed to update schema in RecordWriter", ex); + } finally{ + stats.stopSetup(); } eventBasedRecordWriter = new EventBasedRecordWriter(incoming.getSchema(), http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 8250682..dd58562 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -154,14 +154,14 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { try{ stats.startSetup(); this.aggregator = createAggregatorInternal(); - stats.stopSetup(); return true; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - stats.stopSetup(); context.fail(ex); container.clear(); incoming.kill(); return false; + }finally{ + stats.stopSetup(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 8cad91b..ec12de9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -136,13 +136,14 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { try{ stats.startSetup(); this.aggregator = createAggregatorInternal(); - stats.stopSetup(); return true; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ context.fail(ex); container.clear(); incoming.kill(); return false; + }finally{ + stats.stopSetup(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 9343912..c43b99a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -134,8 +134,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { boolean firstOutputBatch = true; IterOutcome leftUpstream = IterOutcome.NONE; - - private HashTableStats htStats = new HashTableStats(); + + private final HashTableStats htStats = new HashTableStats(); @Override public int getRecordCount() { @@ -171,9 +171,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Create the run time generated code needed to probe and project hashJoinProbe = setupHashJoinProbe(); } - + // Store the number of records projected - if (hashTable != null + if (hashTable != null || joinType != JoinRelType.INNER) { // Allocate the memory for the vectors in the output container @@ -440,12 +440,13 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } private void updateStats(HashTable htable) { + if(htable == null) return; htable.getStats(htStats); this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, htStats.numBuckets); this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, htStats.numEntries); - this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); + this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, htStats.numResizing); } - + @Override public void killIncoming() { this.left.kill(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 84f8354..e32b653 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -166,12 +166,12 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { stats.startSetup(); this.worker = generateNewWorker(); first = true; - stats.stopSetup(); } catch (ClassTransformationException | IOException | SchemaChangeException e) { - stats.stopSetup(); context.fail(new SchemaChangeException(e)); kill(); return IterOutcome.STOP; + } finally { + stats.stopSetup(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/894037ab/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 72a7d3b..4c1f82d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -66,9 +66,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } public final IterOutcome next(int inputIndex, RecordBatch b){ + IterOutcome next = null; stats.stopProcessing(); - IterOutcome next = b.next(); - stats.startProcessing(); + try{ + next = b.next(); + }finally{ + stats.startProcessing(); + } switch(next){ case OK_NEW_SCHEMA: @@ -138,7 +142,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return batch; } - + @Override public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
