DRILL-865: Interface changes to AbstractRecordBatch to enable easy collection of stats. Add BaseRootExec as a wrapper to collect stats for senders.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3e98ffcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3e98ffcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3e98ffcb Branch: refs/heads/master Commit: 3e98ffcbaa5b3ae26db6cfac3d223049b1ad9358 Parents: 83fb40e Author: Mehant Baid <[email protected]> Authored: Wed May 21 23:47:08 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sun Jun 8 19:13:06 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/BaseRootExec.java | 48 +++++ .../exec/physical/impl/TopN/TopNBatch.java | 2 +- .../exec/physical/impl/WriterRecordBatch.java | 155 ++++++++--------- .../physical/impl/aggregate/HashAggBatch.java | 97 +++++------ .../impl/aggregate/StreamingAggBatch.java | 79 ++++----- .../exec/physical/impl/join/HashJoinBatch.java | 6 +- .../exec/physical/impl/join/MergeJoinBatch.java | 141 +++++++-------- .../physical/impl/limit/LimitRecordBatch.java | 4 +- .../impl/mergereceiver/MergingRecordBatch.java | 10 +- .../OrderedPartitionRecordBatch.java | 173 +++++++++---------- .../PartitionSenderRootExec.java | 10 +- .../impl/project/ProjectRecordBatch.java | 4 +- .../exec/physical/impl/sort/SortBatch.java | 2 +- .../impl/svremover/RemovingRecordBatch.java | 4 +- .../physical/impl/union/UnionRecordBatch.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 2 +- .../drill/exec/record/AbstractRecordBatch.java | 40 +++-- .../exec/record/AbstractSingleRecordBatch.java | 71 ++++---- 18 files changed, 426 insertions(+), 424 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java new file mode 100644 index 0000000..0db8c07 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl; + +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.RecordBatch; + +public abstract class BaseRootExec<T extends PhysicalOperator> implements RootExec { + + protected final OperatorStats stats; + protected final OperatorContext oContext; + + public BaseRootExec(FragmentContext context, T operator) throws OutOfMemoryException { + oContext = new OperatorContext(operator, context); + stats = oContext.getStats(); + } + + @Override + public final boolean next() { + try { + stats.startProcessing(); + return innerNext(); + } finally { + stats.stopProcessing(); + } + } + + public abstract boolean innerNext(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 1c1a6d2..712311f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -119,7 +119,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if(schema != null){ if(getSelectionVector4().next()){ return IterOutcome.OK; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 1113af4..3c4bc41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -68,90 +68,83 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - - if(processed) { - // if the upstream record batch is already processed and next() is called by - // downstream then return NONE to indicate completion - return IterOutcome.NONE; + public IterOutcome innerNext() { + if(processed) { + // if the upstream record batch is already processed and next() is called by + // downstream then return NONE to indicate completion + return IterOutcome.NONE; + } + + // process the complete upstream in one next() call + IterOutcome upstream; + do { + upstream = next(incoming); + if(first && upstream == IterOutcome.OK) + upstream = IterOutcome.OK_NEW_SCHEMA; + first = false; + + switch(upstream) { + case NOT_YET: + case NONE: + case STOP: + cleanup(); + if (upstream == IterOutcome.STOP) + return upstream; + break; + + case OK_NEW_SCHEMA: + try{ + setupNewSchema(); + }catch(Exception ex){ + kill(); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; + } + // fall through. + case OK: + try { + counter += eventBasedRecordWriter.write(); + logger.debug("Total records written so far: {}", counter); + } catch(IOException ex) { + throw new RuntimeException(ex); + } + + for(VectorWrapper v : incoming) + v.getValueVector().clear(); + + break; + + default: + throw new UnsupportedOperationException(); } + } while(upstream != IterOutcome.NONE); - // process the complete upstream in one next() call - IterOutcome upstream; - do { - upstream = next(incoming); - if(first && upstream == IterOutcome.OK) - upstream = IterOutcome.OK_NEW_SCHEMA; - first = false; - - switch(upstream) { - case NOT_YET: - case NONE: - case STOP: - cleanup(); - if (upstream == IterOutcome.STOP) - return upstream; - break; - - case OK_NEW_SCHEMA: - try{ - setupNewSchema(); - }catch(Exception ex){ - kill(); - logger.error("Failure during query", ex); - context.fail(ex); - return IterOutcome.STOP; - } - // fall through. - case OK: - try { - counter += eventBasedRecordWriter.write(); - logger.debug("Total records written so far: {}", counter); - } catch(IOException ex) { - throw new RuntimeException(ex); - } - - for(VectorWrapper v : incoming) - v.getValueVector().clear(); - - break; - - default: - throw new UnsupportedOperationException(); - } - } while(upstream != IterOutcome.NONE); - - // Create two vectors for: - // 1. Fragment unique id. - // 2. Summary: currently contains number of records written. - MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); - MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT)); - - VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator()); - AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); - BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator()); - AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); - - - container.add(fragmentIdVector); - container.add(summaryVector); - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - - fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes()); - fragmentIdVector.getMutator().setValueCount(1); - summaryVector.getMutator().setSafe(0, counter); - summaryVector.getMutator().setValueCount(1); - - container.setRecordCount(1); - processed = true; - - return IterOutcome.OK_NEW_SCHEMA; - }finally{ - stats.stopProcessing(); - } + // Create two vectors for: + // 1. Fragment unique id. + // 2. Summary: currently contains number of records written. + MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); + MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT)); + + VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator()); + AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); + BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator()); + AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); + + + container.add(fragmentIdVector); + container.add(summaryVector); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + + fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes()); + fragmentIdVector.getMutator().setValueCount(1); + summaryVector.getMutator().setSafe(0, counter); + summaryVector.getMutator().setValueCount(1); + + container.setRecordCount(1); + processed = true; + return IterOutcome.OK_NEW_SCHEMA; } protected void setupNewSchema() throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 4478938..ad929a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -86,65 +86,60 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - // this is only called on the first batch. Beyond this, the aggregator manages batches. - if (aggregator == null) { - IterOutcome outcome = next(incoming); - logger.debug("Next outcome of {}", outcome); - switch (outcome) { - case NONE: - case NOT_YET: - case STOP: - return outcome; - case OK_NEW_SCHEMA: - if (!createAggregator()){ - done = true; - return IterOutcome.STOP; - } - break; - case OK: - throw new IllegalStateException("You should never get a first batch without a new schema"); - default: - throw new IllegalStateException(String.format("unknown outcome %s", outcome)); + public IterOutcome innerNext() { + // this is only called on the first batch. Beyond this, the aggregator manages batches. + if (aggregator == null) { + IterOutcome outcome = next(incoming); + logger.debug("Next outcome of {}", outcome); + switch (outcome) { + case NONE: + case NOT_YET: + case STOP: + return outcome; + case OK_NEW_SCHEMA: + if (!createAggregator()){ + done = true; + return IterOutcome.STOP; } + break; + case OK: + throw new IllegalStateException("You should never get a first batch without a new schema"); + default: + throw new IllegalStateException(String.format("unknown outcome %s", outcome)); } + } - if (aggregator.allFlushed()) { - return IterOutcome.NONE; - } - - if (aggregator.buildComplete() && ! aggregator.allFlushed()) { - // aggregation is complete and not all records have been output yet - return aggregator.outputCurrentBatch(); + if (aggregator.allFlushed()) { + return IterOutcome.NONE; } - logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + if (aggregator.buildComplete() && ! aggregator.allFlushed()) { + // aggregation is complete and not all records have been output yet + return aggregator.outputCurrentBatch(); + } - while(true){ - AggOutcome out = aggregator.doWork(); - logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ - case CLEANUP_AND_RETURN: - container.clear(); - aggregator.cleanup(); - done = true; - return aggregator.getOutcome(); - case RETURN_OUTCOME: - return aggregator.getOutcome(); - case UPDATE_AGGREGATOR: - aggregator = null; - if(!createAggregator()){ - return IterOutcome.STOP; - } - continue; - default: - throw new IllegalStateException(String.format("Unknown state %s.", out)); + logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + + while(true){ + AggOutcome out = aggregator.doWork(); + logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); + switch(out){ + case CLEANUP_AND_RETURN: + container.clear(); + aggregator.cleanup(); + done = true; + return aggregator.getOutcome(); + case RETURN_OUTCOME: + return aggregator.getOutcome(); + case UPDATE_AGGREGATOR: + aggregator = null; + if(!createAggregator()){ + return IterOutcome.STOP; } + continue; + default: + throw new IllegalStateException(String.format("Unknown state %s.", out)); } - }finally{ - stats.stopProcessing(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 5b61a82..8cad91b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -78,53 +78,48 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - // this is only called on the first batch. Beyond this, the aggregator manages batches. - if (aggregator == null) { - IterOutcome outcome = next(incoming); - logger.debug("Next outcome of {}", outcome); - switch (outcome) { - case NONE: - case NOT_YET: - case STOP: - return outcome; - case OK_NEW_SCHEMA: - if (!createAggregator()){ - done = true; - return IterOutcome.STOP; - } - break; - case OK: - throw new IllegalStateException("You should never get a first batch without a new schema"); - default: - throw new IllegalStateException(String.format("unknown outcome %s", outcome)); + public IterOutcome innerNext() { + // this is only called on the first batch. Beyond this, the aggregator manages batches. + if (aggregator == null) { + IterOutcome outcome = next(incoming); + logger.debug("Next outcome of {}", outcome); + switch (outcome) { + case NONE: + case NOT_YET: + case STOP: + return outcome; + case OK_NEW_SCHEMA: + if (!createAggregator()){ + done = true; + return IterOutcome.STOP; } + break; + case OK: + throw new IllegalStateException("You should never get a first batch without a new schema"); + default: + throw new IllegalStateException(String.format("unknown outcome %s", outcome)); } + } - while(true){ - AggOutcome out = aggregator.doWork(); - logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); - switch(out){ - case CLEANUP_AND_RETURN: - container.clear(); - done = true; - return aggregator.getOutcome(); - case RETURN_OUTCOME: - return aggregator.getOutcome(); - case UPDATE_AGGREGATOR: - aggregator = null; - if(!createAggregator()){ - return IterOutcome.STOP; - } - continue; - default: - throw new IllegalStateException(String.format("Unknown state %s.", out)); + while(true){ + AggOutcome out = aggregator.doWork(); + logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); + switch(out){ + case CLEANUP_AND_RETURN: + container.clear(); + done = true; + return aggregator.getOutcome(); + case RETURN_OUTCOME: + return aggregator.getOutcome(); + case UPDATE_AGGREGATOR: + aggregator = null; + if(!createAggregator()){ + return IterOutcome.STOP; } + continue; + default: + throw new IllegalStateException(String.format("Unknown state %s.", out)); } - }finally{ - stats.stopProcessing(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2ea9339..684965d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -143,8 +143,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { @Override - public IterOutcome next() { - stats.startProcessing(); + public IterOutcome innerNext() { try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side @@ -225,10 +224,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { context.fail(e); killIncoming(); return IterOutcome.STOP; - } finally{ - stats.stopProcessing(); } - } public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index b284454..587dd6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -130,87 +130,80 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } @Override - public IterOutcome next() { - stats.startProcessing(); - - try{ - // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. - status.ensureInitial(); - - // loop so we can start over again if we find a new batch was created. - while(true){ - - JoinOutcome outcome = status.getOutcome(); - // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED) - allocateBatch(); - - // reset the output position to zero after our parent iterates this RecordBatch - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED || - outcome == JoinOutcome.NO_MORE_DATA) - status.resetOutputPos(); - - if (outcome == JoinOutcome.NO_MORE_DATA) { - logger.debug("NO MORE DATA; returning {} NONE"); - return IterOutcome.NONE; - } + public IterOutcome innerNext() { + // we do this in the here instead of the constructor because don't necessary want to start consuming on construction. + status.ensureInitial(); + + // loop so we can start over again if we find a new batch was created. + while(true){ + + JoinOutcome outcome = status.getOutcome(); + // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. + if (outcome == JoinOutcome.BATCH_RETURNED || + outcome == JoinOutcome.SCHEMA_CHANGED) + allocateBatch(); + + // reset the output position to zero after our parent iterates this RecordBatch + if (outcome == JoinOutcome.BATCH_RETURNED || + outcome == JoinOutcome.SCHEMA_CHANGED || + outcome == JoinOutcome.NO_MORE_DATA) + status.resetOutputPos(); + + if (outcome == JoinOutcome.NO_MORE_DATA) { + logger.debug("NO MORE DATA; returning {} NONE"); + return IterOutcome.NONE; + } - boolean first = false; - if(worker == null){ - try { - logger.debug("Creating New Worker"); - stats.startSetup(); - this.worker = generateNewWorker(); - first = true; - stats.stopSetup(); - } catch (ClassTransformationException | IOException | SchemaChangeException e) { - stats.stopSetup(); - context.fail(new SchemaChangeException(e)); - kill(); - return IterOutcome.STOP; - } + boolean first = false; + if(worker == null){ + try { + logger.debug("Creating New Worker"); + stats.startSetup(); + this.worker = generateNewWorker(); + first = true; + stats.stopSetup(); + } catch (ClassTransformationException | IOException | SchemaChangeException e) { + stats.stopSetup(); + context.fail(new SchemaChangeException(e)); + kill(); + return IterOutcome.STOP; } + } - // join until we have a complete outgoing batch - if (!worker.doJoin(status)) - worker = null; - - // get the outcome of the join. - switch(status.getOutcome()){ - case BATCH_RETURNED: - // only return new schema if new worker has been setup. - logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); + // join until we have a complete outgoing batch + if (!worker.doJoin(status)) + worker = null; + + // get the outcome of the join. + switch(status.getOutcome()){ + case BATCH_RETURNED: + // only return new schema if new worker has been setup. + logger.debug("BATCH RETURNED; returning {}", (first ? "OK_NEW_SCHEMA" : "OK")); + setRecordCountInContainer(); + return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; + case FAILURE: + kill(); + return IterOutcome.STOP; + case NO_MORE_DATA: + logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE")); + setRecordCountInContainer(); + return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE; + case SCHEMA_CHANGED: + worker = null; + if(status.getOutPosition() > 0){ + // if we have current data, let's return that. + logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - case FAILURE: - kill(); - return IterOutcome.STOP; - case NO_MORE_DATA: - logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : "NONE")); - setRecordCountInContainer(); - return status.getOutPosition() > 0 ? (first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK): IterOutcome.NONE; - case SCHEMA_CHANGED: - worker = null; - if(status.getOutPosition() > 0){ - // if we have current data, let's return that. - logger.debug("SCHEMA CHANGED; returning {} ", (first ? "OK_NEW_SCHEMA" : "OK")); - setRecordCountInContainer(); - return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; - }else{ - // loop again to rebuild worker. - continue; - } - case WAITING: - return IterOutcome.NOT_YET; - default: - throw new IllegalStateException(); + }else{ + // loop again to rebuild worker. + continue; } + case WAITING: + return IterOutcome.NOT_YET; + default: + throw new IllegalStateException(); } - - }finally{ - stats.stopProcessing(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 648fd89..3e408bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -78,7 +78,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if(!noEndLimit && recordsLeft <= 0) { // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared // Drain the incoming record batch and clear the memory @@ -96,7 +96,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { return IterOutcome.NONE; } - return super.next(); + return super.innerNext(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index be5bf76..07a949c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -121,10 +121,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - + public IterOutcome innerNext() { if (fragProviders.length == 0) return IterOutcome.NONE; boolean schemaChanged = false; @@ -336,11 +333,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> return IterOutcome.OK_NEW_SCHEMA; else return IterOutcome.OK; - - }finally{ - stats.stopProcessing(); - } - } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 4c0d3e0..f677e54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -424,103 +424,98 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } @Override - public IterOutcome next() { - stats.startProcessing(); - try{ - container.zeroVectors(); - - // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are - // done - if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) - return IterOutcome.NONE; - - // if there are batches on the queue, process them first, rather than calling incoming.next() - if (batchQueue != null && batchQueue.size() > 0) { - VectorContainer vc = batchQueue.poll(); - recordCount = vc.getRecordCount(); - try { - - // Must set up a new schema each time, because ValueVectors are not reused between containers in queue - setupNewSchema(vc); - } catch (SchemaChangeException ex) { - kill(); - logger.error("Failure during query", ex); - context.fail(ex); - return IterOutcome.STOP; - } - doWork(vc); - vc.zeroVectors(); - return IterOutcome.OK_NEW_SCHEMA; + public IterOutcome innerNext() { + container.zeroVectors(); + + // if we got IterOutcome.NONE while getting partition vectors, and there are no batches on the queue, then we are + // done + if (upstreamNone && (batchQueue == null || batchQueue.size() == 0)) + return IterOutcome.NONE; + + // if there are batches on the queue, process them first, rather than calling incoming.next() + if (batchQueue != null && batchQueue.size() > 0) { + VectorContainer vc = batchQueue.poll(); + recordCount = vc.getRecordCount(); + try { + + // Must set up a new schema each time, because ValueVectors are not reused between containers in queue + setupNewSchema(vc); + } catch (SchemaChangeException ex) { + kill(); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; } + doWork(vc); + vc.zeroVectors(); + return IterOutcome.OK_NEW_SCHEMA; + } - // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are - // more incoming - IterOutcome upstream = next(incoming); + // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are + // more incoming + IterOutcome upstream = next(incoming); - if (this.first && upstream == IterOutcome.OK) { - throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA"); - } - - // If this is the first iteration, we need to generate the partition vectors before we can proceed - if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { - if (!getPartitionVectors()){ - cleanup(); - return IterOutcome.STOP; - } + if (this.first && upstream == IterOutcome.OK) { + throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA"); + } - batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches); - first = false; - - // Now that we have the partition vectors, we immediately process the first batch on the queue - VectorContainer vc = batchQueue.poll(); - try { - setupNewSchema(vc); - } catch (SchemaChangeException ex) { - kill(); - logger.error("Failure during query", ex); - context.fail(ex); - return IterOutcome.STOP; - } - doWork(vc); - vc.zeroVectors(); - recordCount = vc.getRecordCount(); - return IterOutcome.OK_NEW_SCHEMA; + // If this is the first iteration, we need to generate the partition vectors before we can proceed + if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { + if (!getPartitionVectors()){ + cleanup(); + return IterOutcome.STOP; } - // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the - // first one - // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema. - if (this.startedUnsampledBatches == false) { - this.startedUnsampledBatches = true; - if (upstream == IterOutcome.OK) - upstream = IterOutcome.OK_NEW_SCHEMA; + batchQueue = new LinkedBlockingQueue<>(this.sampledIncomingBatches); + first = false; + + // Now that we have the partition vectors, we immediately process the first batch on the queue + VectorContainer vc = batchQueue.poll(); + try { + setupNewSchema(vc); + } catch (SchemaChangeException ex) { + kill(); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; } - switch (upstream) { - case NONE: - case NOT_YET: - case STOP: - cleanup(); - recordCount = 0; - return upstream; - case OK_NEW_SCHEMA: - try { - setupNewSchema(incoming); - } catch (SchemaChangeException ex) { - kill(); - logger.error("Failure during query", ex); - context.fail(ex); - return IterOutcome.STOP; - } - // fall through. - case OK: - doWork(incoming); - recordCount = incoming.getRecordCount(); - return upstream; // change if upstream changed, otherwise normal. - default: - throw new UnsupportedOperationException(); + doWork(vc); + vc.zeroVectors(); + recordCount = vc.getRecordCount(); + return IterOutcome.OK_NEW_SCHEMA; + } + + // if this now that all the batches on the queue are processed, we begin processing the incoming batches. For the + // first one + // we need to generate a new schema, even if the outcome is IterOutcome.OK After that we can reuse the schema. + if (this.startedUnsampledBatches == false) { + this.startedUnsampledBatches = true; + if (upstream == IterOutcome.OK) + upstream = IterOutcome.OK_NEW_SCHEMA; + } + switch (upstream) { + case NONE: + case NOT_YET: + case STOP: + cleanup(); + recordCount = 0; + return upstream; + case OK_NEW_SCHEMA: + try { + setupNewSchema(incoming); + } catch (SchemaChangeException ex) { + kill(); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; } - }finally{ - stats.stopProcessing(); + // fall through. + case OK: + doWork(incoming); + recordCount = incoming.getRecordCount(); + return upstream; // change if upstream changed, otherwise normal. + default: + throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index ffb3780..5476a50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; +import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; @@ -48,17 +49,15 @@ import com.sun.codemodel.JType; import org.apache.drill.exec.vector.CopyUtil; -public class PartitionSenderRootExec implements RootExec { +public class PartitionSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionSenderRootExec.class); private RecordBatch incoming; private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; - private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); - private final OperatorStats stats; private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; @@ -68,18 +67,17 @@ public class PartitionSenderRootExec implements RootExec { RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { + super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; - this.oContext = new OperatorContext(operator, context); - this.stats = oContext.getStats(); this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); } @Override - public boolean next() { + public boolean innerNext() { boolean newSchema = false; if (!ok) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 05a6724..93cd19d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -75,12 +75,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if (hasRemainder) { handleRemainder(); return IterOutcome.OK; } - return super.next(); + return super.innerNext(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 375276e..f21673d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -98,7 +98,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if(schema != null){ if(getSelectionVector4().next()){ return IterOutcome.OK; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index fd06de1..f3388dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -83,12 +83,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if (hasRemainder) { handleRemainder(); return IterOutcome.OK; } - return super.next(); + return super.innerNext(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java index c27b3c8..d515323 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java @@ -80,7 +80,7 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> { } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if (current == null) { // end of iteration return IterOutcome.NONE; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 7a2b251..d4c0b25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -159,7 +159,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - public IterOutcome next() { + public IterOutcome innerNext() { if(schema != null){ if (spillCount == 0) { if(schema != null){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 53b223e..7b7b708 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -59,32 +59,36 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return popConfig; } - public final IterOutcome next(RecordBatch b){ + public final IterOutcome next(RecordBatch b) { + return next(0, b); } public final IterOutcome next(int inputIndex, RecordBatch b){ - stats.stopProcessing(); - try{ - IterOutcome next = b.next(); - - switch(next){ - case OK_NEW_SCHEMA: - stats.batchReceived(inputIndex, b.getRecordCount(), true); - break; - case OK: - stats.batchReceived(inputIndex, b.getRecordCount(), false); - break; - } - - return next; - - }finally{ - stats.startProcessing(); + IterOutcome next = b.next(); + + switch(next){ + case OK_NEW_SCHEMA: + stats.batchReceived(inputIndex, b.getRecordCount(), true); + break; + case OK: + stats.batchReceived(inputIndex, b.getRecordCount(), false); + break; } + return next; + } + public final IterOutcome next() { + try { + stats.startProcessing(); + return innerNext(); + } finally { + stats.stopProcessing(); + } } + public abstract IterOutcome innerNext(); + @Override public BatchSchema getSchema() { return container.getSchema(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3e98ffcb/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index d897a78..c5fdaeb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -41,47 +41,40 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } @Override - public IterOutcome next() { - try{ - stats.startProcessing(); - IterOutcome upstream = next(incoming); - if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA; - first = false; - switch(upstream){ - case NONE: - case NOT_YET: - case STOP: - return upstream; - case OUT_OF_MEMORY: - return upstream; - case OK_NEW_SCHEMA: - try{ - stats.startSetup(); - setupNewSchema(); - }catch(SchemaChangeException ex){ - kill(); - logger.error("Failure during query", ex); - context.fail(ex); - return IterOutcome.STOP; - }finally{ - stats.stopSetup(); - } - // fall through. - case OK: - doWork(); - if (outOfMemory) { - outOfMemory = false; - return IterOutcome.OUT_OF_MEMORY; - } - return upstream; // change if upstream changed, otherwise normal. - default: - throw new UnsupportedOperationException(); + public IterOutcome innerNext() { + IterOutcome upstream = next(incoming); + if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA; + first = false; + switch(upstream){ + case NONE: + case NOT_YET: + case STOP: + return upstream; + case OUT_OF_MEMORY: + return upstream; + case OK_NEW_SCHEMA: + try{ + stats.startSetup(); + setupNewSchema(); + }catch(SchemaChangeException ex){ + kill(); + logger.error("Failure during query", ex); + context.fail(ex); + return IterOutcome.STOP; + }finally{ + stats.stopSetup(); } - }finally{ - stats.stopProcessing(); + // fall through. + case OK: + doWork(); + if (outOfMemory) { + outOfMemory = false; + return IterOutcome.OUT_OF_MEMORY; + } + return upstream; // change if upstream changed, otherwise normal. + default: + throw new UnsupportedOperationException(); } - - } @Override
