Updated Branches: refs/heads/master c287fa604 -> d17f48315
DRILL-254: Add iterator validator and correct interface violations Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d17f4831 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d17f4831 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d17f4831 Branch: refs/heads/master Commit: d17f483158493aca8a971c1674bf0d31975675df Parents: c287fa6 Author: Jacques Nadeau <[email protected]> Authored: Fri Oct 25 09:32:44 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Sat Nov 9 20:19:13 2013 -0800 ---------------------------------------------------------------------- .../physical/base/AbstractPhysicalVisitor.java | 5 + .../exec/physical/base/PhysicalVisitor.java | 2 + .../exec/physical/config/IteratorValidator.java | 48 ++++++++ .../drill/exec/physical/impl/ImplCreator.java | 16 ++- .../drill/exec/physical/impl/ScreenCreator.java | 26 ++--- .../exec/physical/impl/SingleSenderCreator.java | 2 +- .../exec/physical/impl/WireRecordBatch.java | 7 ++ .../OrderedPartitionRecordBatch.java | 17 ++- .../IteratorValidatorBatchIterator.java | 116 +++++++++++++++++++ .../impl/validate/IteratorValidatorCreator.java | 39 +++++++ .../validate/IteratorValidatorInjector.java | 74 ++++++++++++ .../exec/record/FragmentWritableBatch.java | 20 +++- 12 files changed, 344 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 9e2ef0a..d1188bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -141,6 +141,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitIteratorValidator(IteratorValidator op, X value) throws E { + return visitOp(op, value); + } + + @Override public T visitOp(PhysicalOperator op, X value) throws E{ throw new UnsupportedOperationException(String.format( "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.", this http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 2474c15..120306a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -55,4 +55,6 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; + + public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java new file mode 100644 index 0000000..67bba96 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/IteratorValidator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; + +public class IteratorValidator extends AbstractSingle{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidator.class); + + public IteratorValidator(PhysicalOperator child) { + super(child); + + } + + @Override + public OperatorCost getCost() { + return new OperatorCost(0f, 0f, 0f, 0f); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitIteratorValidator(this, value); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new IteratorValidator(child); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 67e9452..366be22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.physical.config.*; import org.apache.drill.exec.physical.impl.aggregate.AggBatchCreator; -import org.apache.drill.exec.physical.config.Union; import org.apache.drill.exec.physical.impl.filter.FilterBatchCreator; import org.apache.drill.exec.physical.impl.join.MergeJoinCreator; import org.apache.drill.exec.physical.impl.limit.LimitBatchCreator; @@ -39,6 +38,8 @@ import org.apache.drill.exec.physical.impl.sort.SortBatchCreator; import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator; import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator; import org.apache.drill.exec.physical.impl.union.UnionBatchCreator; +import org.apache.drill.exec.physical.impl.validate.IteratorValidatorCreator; +import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.json.JSONScanBatchCreator; import org.apache.drill.exec.store.json.JSONSubScan; @@ -73,6 +74,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo private SortBatchCreator sbc = new SortBatchCreator(); private AggBatchCreator abc = new AggBatchCreator(); private MergeJoinCreator mjc = new MergeJoinCreator(); + private IteratorValidatorCreator ivc = new IteratorValidatorCreator(); private RootExec root = null; private TraceBatchCreator tbc = new TraceBatchCreator(); @@ -190,12 +192,24 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo return children; } + @Override + public RecordBatch visitIteratorValidator(IteratorValidator op, FragmentContext context) throws ExecutionSetupException { + return ivc.getBatch(context, op, getChildren(op, context)); + } + public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { ImplCreator i = new ImplCreator(); + boolean isAssertEnabled = false; + assert isAssertEnabled = true; + if(isAssertEnabled){ + root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); + } root.accept(i, context); if (i.root == null) throw new ExecutionSetupException( "The provided fragment did not have a root node that correctly created a RootExec value."); return i.getRoot(); } + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 4d1e3ce..e1fb3ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -91,23 +91,15 @@ public class ScreenCreator implements RootCreator<Screen>{ return false; } case NONE: { - if(materializer == null){ - // receive no results. - context.batchesCompleted.inc(1); - context.recordsCompleted.inc(incoming.getRecordCount()); - QueryResult header = QueryResult.newBuilder() // - .setQueryId(context.getHandle().getQueryId()) // - .setRowCount(0) // - .setDef(RecordBatchDef.getDefaultInstance()) // - .setIsLastChunk(true) // - .build(); - QueryWritableBatch batch = new QueryWritableBatch(header); - connection.sendResult(listener, batch); - - }else{ - QueryWritableBatch batch = materializer.convertNext(true); - connection.sendResult(listener, batch); - } + context.batchesCompleted.inc(1); + QueryResult header = QueryResult.newBuilder() // + .setQueryId(context.getHandle().getQueryId()) // + .setRowCount(0) // + .setDef(RecordBatchDef.getDefaultInstance()) // + .setIsLastChunk(true) // + .build(); + QueryWritableBatch batch = new QueryWritableBatch(header); + connection.sendResult(listener, batch); return false; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index ca9929f..1d1d420 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -73,7 +73,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ switch(out){ case STOP: case NONE: - FragmentWritableBatch b2 = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch()); + FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0); tunnel.sendRecordBatch(new RecordSendFailure(), context, b2); return false; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index b63a4d0..0b0214a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -97,8 +97,15 @@ public class WireRecordBatch implements RecordBatch{ @Override public IterOutcome next() { RawFragmentBatch batch = fragProvider.getNext(); + + // skip over empty batches. we do this since these are basically control messages. + while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){ + batch = fragProvider.getNext(); + } + try{ if (batch == null) return IterOutcome.NONE; + logger.debug("Next received batch {}", batch); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 317e705..a3d1d09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -83,7 +83,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart private boolean upstreamNone = false; private int recordCount; private DistributedMap<VectorAccessibleSerializable> tableMap; - private DistributedMultiMap mmap; + private DistributedMultiMap<?> mmap; private String mapKey; public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context){ @@ -123,6 +123,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart case STOP: upstreamNone = true; break outer; + default: + // fall through } builder.add(incoming); recordsSampled += incoming.getRecordCount(); @@ -144,7 +146,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart SampleCopier copier = getCopier(sv4, sampleContainer, containerToCache, popConfig.getOrderings()); copier.copyRecords(recordsSampled/(samplingFactor * partitions), 0, samplingFactor * partitions); - for (VectorWrapper vw : containerToCache) { + for (VectorWrapper<?> vw : containerToCache) { vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords()); } containerToCache.setRecordCount(copier.getOutputRecords()); @@ -156,7 +158,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId()); mmap = cache.getMultiMap(VectorAccessibleSerializable.class); List<ValueVector> vectorList = Lists.newArrayList(); - for (VectorWrapper vw : containerToCache) { + for (VectorWrapper<?> vw : containerToCache) { vectorList.add(vw.getValueVector()); } @@ -193,7 +195,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart Preconditions.checkState(wrap != null); // Extract vectors from the wrapper, and add to partition vectors. These vectors will be used for partitioning in the rest of this operator - for (VectorWrapper w : wrap.get()) { + for (VectorWrapper<?> w : wrap.get()) { partitionVectors.add(w.getValueVector()); } } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { @@ -235,7 +237,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions; copier2.copyRecords(skipRecords, skipRecords, partitions - 1); assert copier2.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier2.getOutputRecords(), partitions); - for (VectorWrapper vw : candidatePartitionTable) { + for (VectorWrapper<?> vw : candidatePartitionTable) { vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords()); } candidatePartitionTable.setRecordCount(copier2.getOutputRecords()); @@ -324,7 +326,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Reaching this point, either this is the first iteration, or there are no batches left on the queue and there are more incoming IterOutcome upstream = incoming.next(); - recordCount = incoming.getRecordCount(); + if(this.first && upstream == IterOutcome.OK) { throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA"); @@ -347,6 +349,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart return IterOutcome.STOP; } doWork(vc); + recordCount = vc.getRecordCount(); return IterOutcome.OK_NEW_SCHEMA; } @@ -361,6 +364,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart case NOT_YET: case STOP: container.zeroVectors(); + recordCount = 0; return upstream; case OK_NEW_SCHEMA: try{ @@ -374,6 +378,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // fall through. case OK: doWork(incoming); + recordCount = incoming.getRecordCount(); return upstream; // change if upstream changed, otherwise normal. default: throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java new file mode 100644 index 0000000..d6f08f1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.validate; + +import java.util.Iterator; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +public class IteratorValidatorBatchIterator implements RecordBatch{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); + + private IterOutcome state = IterOutcome.NOT_YET; + private final RecordBatch incoming; + + public IteratorValidatorBatchIterator(RecordBatch incoming){ + this.incoming = incoming; + } + + private void validateReadState(){ + switch(state){ + case OK: + case OK_NEW_SCHEMA: + return; + default: + throw new IllegalStateException(String.format("You tried to do a batch data read operation when you were in a state of %s. You can only do this type of operation when you are in a state of OK or OK_NEW_SCHEMA.", state.name())); + } + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + validateReadState(); + return incoming.iterator(); + } + + @Override + public FragmentContext getContext() { + return incoming.getContext(); + } + + @Override + public BatchSchema getSchema() { + validateReadState(); + return incoming.getSchema(); + } + + @Override + public int getRecordCount() { + validateReadState(); + return incoming.getRecordCount(); + } + + @Override + public void kill() { + incoming.kill(); + } + + @Override + public SelectionVector2 getSelectionVector2() { + validateReadState(); + return incoming.getSelectionVector2(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + validateReadState(); + return incoming.getSelectionVector4(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + validateReadState(); + return incoming.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(int fieldId, Class<?> clazz) { + validateReadState(); + return incoming.getValueAccessorById(fieldId, clazz); + } + + @Override + public IterOutcome next() { + if(state == IterOutcome.NONE ) throw new IllegalStateException("The incoming iterator has previously moved to a state of NONE. You should not be attempting to call next() again."); + state = incoming.next(); + return state; + } + + @Override + public WritableBatch getWritableBatch() { + validateReadState(); + return incoming.getWritableBatch(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java new file mode 100644 index 0000000..5d08afb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.validate; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.IteratorValidator; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import com.google.common.base.Preconditions; + +public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class); + + @Override + public RecordBatch getBatch(FragmentContext context, IteratorValidator config, List<RecordBatch> children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + return new IteratorValidatorBatchIterator(children.iterator().next()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java new file mode 100644 index 0000000..aff71bf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorInjector.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.validate; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.IteratorValidator; + +import com.google.common.collect.Lists; + +public class IteratorValidatorInjector extends + AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorInjector.class); + + public static FragmentRoot rewritePlanWithIteratorValidator(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { + IteratorValidatorInjector inject = new IteratorValidatorInjector(); + PhysicalOperator newOp = root.accept(inject, context); + + if( !(newOp instanceof FragmentRoot) ) throw new IllegalStateException("This shouldn't happen."); + + return (FragmentRoot) newOp; + + } + + /** + * Traverse the physical plan and inject the IteratorValidator operator after every operator. + * + * @param op + * Physical operator under which the IteratorValidator operator will be injected + * @param context + * Fragment context + * @return same physical operator as passed in, but its child will be a IteratorValidator operator whose child will be the + * original child of this operator + * @throws ExecutionSetupException + */ + @Override + public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException { + + List<PhysicalOperator> newChildren = Lists.newArrayList(); + PhysicalOperator newOp = op; + + /* Get the list of child operators */ + for (PhysicalOperator child : op) { + newChildren.add(new IteratorValidator(child.accept(this, context))); + } + + /* Inject trace operator */ + if (newChildren.size() > 0) + newOp = op.getNewWithChildren(newChildren); + + return newOp; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d17f4831/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java index a1f22c2..41ad8b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java @@ -22,15 +22,22 @@ import io.netty.buffer.ByteBuf; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.FragmentRecordBatch; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; public class FragmentWritableBatch{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class); + private static RecordBatchDef EMPTY_DEF = RecordBatchDef.newBuilder().setRecordCount(0).build(); + private final ByteBuf[] buffers; private final FragmentRecordBatch header; - + public FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, WritableBatch batch){ - this.buffers = batch.getBuffers(); + this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, batch.getDef(), batch.getBuffers()); + } + + private FragmentWritableBatch(boolean isLast, QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId, RecordBatchDef def, ByteBuf... buffers){ + this.buffers = buffers; FragmentHandle handle = FragmentHandle // .newBuilder() // .setMajorFragmentId(receiveMajorFragmentId) // @@ -40,12 +47,17 @@ public class FragmentWritableBatch{ this.header = FragmentRecordBatch // .newBuilder() // .setIsLastBatch(isLast) // - .setDef(batch.getDef()) // + .setDef(def) // .setHandle(handle) // .setSendingMajorFragmentId(sendMajorFragmentId) // .setSendingMinorFragmentId(sendMinorFragmentId) // .build(); } + + + public static FragmentWritableBatch getEmptyLast(QueryId queryId, int sendMajorFragmentId, int sendMinorFragmentId, int receiveMajorFragmentId, int receiveMinorFragmentId){ + return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentId, EMPTY_DEF); + } public ByteBuf[] getBuffers(){ return buffers; @@ -53,9 +65,11 @@ public class FragmentWritableBatch{ public FragmentRecordBatch getHeader() { return header; + } + }
