DRILL-991: Limit should terminate upstream fragments immediately upon completion
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c331aed8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c331aed8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c331aed8 Branch: refs/heads/master Commit: c331aed81e73d16ea29bf8c94863591b212aa644 Parents: 5e482c1 Author: Steven Phillips <[email protected]> Authored: Wed Jul 23 20:03:07 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Fri Jul 25 18:33:53 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/physical/impl/BaseRootExec.java | 6 + .../drill/exec/physical/impl/RootExec.java | 8 + .../drill/exec/physical/impl/ScanBatch.java | 8 +- .../exec/physical/impl/SingleSenderCreator.java | 20 +- .../exec/physical/impl/TopN/TopNBatch.java | 13 +- .../exec/physical/impl/WriterRecordBatch.java | 6 +- .../physical/impl/aggregate/HashAggBatch.java | 6 +- .../impl/aggregate/StreamingAggBatch.java | 6 +- .../exec/physical/impl/join/HashJoinBatch.java | 9 +- .../exec/physical/impl/join/MergeJoinBatch.java | 10 +- .../physical/impl/limit/LimitRecordBatch.java | 4 +- .../impl/mergereceiver/MergingRecordBatch.java | 52 +- .../OrderedPartitionRecordBatch.java | 12 +- .../partitionsender/PartitionOutgoingBatch.java | 26 + .../PartitionSenderRootExec.java | 57 +- .../partitionsender/PartitionStatsBatch.java | 24 - .../impl/partitionsender/Partitioner.java | 3 +- .../partitionsender/PartitionerTemplate.java | 27 +- .../impl/producer/ProducerConsumerBatch.java | 2 +- .../exec/physical/impl/sort/SortBatch.java | 11 +- .../impl/union/UnionAllRecordBatch.java | 10 +- .../UnorderedReceiverBatch.java | 47 +- .../IteratorValidatorBatchIterator.java | 4 +- .../physical/impl/xsort/ExternalSortBatch.java | 10 +- .../exec/planner/physical/PlannerSettings.java | 2 +- .../drill/exec/record/AbstractRecordBatch.java | 6 +- .../exec/record/AbstractSingleRecordBatch.java | 6 +- .../apache/drill/exec/record/RecordBatch.java | 2 +- .../exec/rpc/control/ControlRpcConfig.java | 2 + .../drill/exec/rpc/control/ControlTunnel.java | 22 +- .../exec/work/batch/ControlHandlerImpl.java | 22 + .../exec/work/fragment/FragmentExecutor.java | 5 + .../exec/physical/impl/SimpleRootExec.java | 6 + .../org/apache/drill/exec/proto/BitControl.java | 810 ++++++++++++++++++- .../drill/exec/proto/SchemaBitControl.java | 122 +++ .../exec/proto/beans/FinishedReceiver.java | 189 +++++ protocol/src/main/protobuf/BitControl.proto | 20 +- 37 files changed, 1418 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index fa6c997..c2c3144 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -72,4 +73,9 @@ public abstract class BaseRootExec implements RootExec { } public abstract boolean innerNext(); + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + logger.warn("Currently not handling FinishedFragment message"); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index fcc10aa..42ac4f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; + /** * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange * output nodes and storage nodes. They are there driving force behind the completion of a query. @@ -35,5 +37,11 @@ public interface RootExec { * Inform all children to clean up and go away. */ public void stop(); + + /** + * Inform sender that receiving fragment is finished and doesn't need any more data + * @param handle + */ + public void receivingFragmentFinished(FragmentHandle handle); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index a8881f0..21a580b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -119,8 +119,12 @@ public class ScanBatch implements RecordBatch { } @Override - public void kill() { - releaseAssets(); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + done = true; + } else { + releaseAssets(); + } } private void releaseAssets() { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 325e315..26aa5ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -56,6 +56,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ private int recMajor; private FragmentContext context; private volatile boolean ok = true; + private volatile boolean done = false; private final SendingAccountor sendCount = new SendingAccountor(); public enum Metric implements MetricDef { @@ -81,11 +82,18 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public boolean innerNext() { if(!ok){ - incoming.kill(); + incoming.kill(false); return false; } - IterOutcome out = next(incoming); + + IterOutcome out; + if (!done) { + out = next(incoming); + } else { + incoming.kill(true); + out = IterOutcome.NONE; + } // logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: @@ -132,8 +140,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ oContext.close(); incoming.cleanup(); } - - + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + done = true; + } + private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{ @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 0132e85..fb9554c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -101,11 +101,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void kill() { - incoming.kill(); - } - - @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); } @@ -203,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -297,8 +292,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @@ -334,7 +329,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void kill() { + public void kill(boolean sendUpstream) { } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 43e0dd4..29b346d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -67,8 +67,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -100,7 +100,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { try{ setupNewSchema(); }catch(Exception ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index b30a357..393fa4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -180,7 +180,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { }catch(SchemaChangeException | ClassTransformationException | IOException ex){ context.fail(ex); container.clear(); - incoming.kill(); + incoming.kill(false); return false; }finally{ stats.stopSetup(); @@ -301,8 +301,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 2f71bf9..3913112 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -157,7 +157,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { }catch(SchemaChangeException | ClassTransformationException | IOException ex){ context.fail(ex); container.clear(); - incoming.kill(); + incoming.kill(false); return false; }finally{ stats.stopSetup(); @@ -338,8 +338,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 46f7d51..7233f69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -242,6 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { for (VectorWrapper<?> wrapper : left) { wrapper.getValueVector().clear(); } + left.kill(true); leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { for (VectorWrapper<?> wrapper : left) { @@ -260,7 +261,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { context.fail(e); - killIncoming(); + killIncoming(false); return IterOutcome.STOP; } } @@ -483,9 +484,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } @Override - public void killIncoming() { - this.left.kill(); - this.right.kill(); + public void killIncoming(boolean sendUpstream) { + this.left.kill(sendUpstream); + this.right.kill(sendUpstream); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 0c6657c..24ca463 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -172,7 +172,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { first = true; } catch (ClassTransformationException | IOException | SchemaChangeException e) { context.fail(new SchemaChangeException(e)); - kill(); + kill(false); return IterOutcome.STOP; } finally { stats.stopSetup(); @@ -191,7 +191,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; case FAILURE: - kill(); + kill(false); return IterOutcome.STOP; case NO_MORE_DATA: logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE"))); @@ -233,9 +233,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } @Override - protected void killIncoming() { - left.kill(); - right.kill(); + protected void killIncoming(boolean sendUpstream) { + left.kill(sendUpstream); + right.kill(sendUpstream); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 078c4c4..12ee406 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -83,8 +83,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public IterOutcome innerNext() { if(!noEndLimit && recordsLeft <= 0) { - // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared - // Drain the incoming record batch and clear the memory + incoming.kill(true); + IterOutcome upStream = incoming.next(); while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 313fdec..b8e18af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -43,6 +44,9 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.MergingReceiverPOP; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -60,6 +64,8 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; @@ -87,6 +93,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private BatchSchema schema; private VectorContainer outgoingContainer; private MergingReceiverGeneratorBase merger; + private MergingReceiverPOP config; private boolean hasRun = false; private boolean prevBatchWasFull = false; private boolean hasMoreIncoming = true; @@ -119,6 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.context = context; this.outgoingContainer = new VectorContainer(); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); + this.config = config; } private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{ @@ -437,15 +445,49 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } @Override - public void kill() { - cleanup(); - for (RawFragmentBatchProvider provider : fragProviders) { - provider.kill(context); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + informSenders(); + } else { + cleanup(); + for (RawFragmentBatchProvider provider : fragProviders) { + provider.kill(context); + } + } + } + + private void informSenders() { + FragmentHandle handlePrototype = FragmentHandle.newBuilder() + .setMajorFragmentId(config.getOppositeMajorFragmentId()) + .setQueryId(context.getHandle().getQueryId()) + .build(); + for (int i = 0; i < config.getNumSenders(); i++) { + FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) + .setMinorFragmentId(i) + .build(); + FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() + .setReceiver(context.getHandle()) + .setSender(sender) + .build(); + context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + } + } + + private class OutcomeListener implements RpcOutcomeListener<Ack> { + + @Override + public void failed(RpcException ex) { + logger.warn("Failed to inform upstream that receiver is finished"); + } + + @Override + public void success(Ack value, ByteBuf buffer) { + // Do nothing } } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { //No op } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index f677e54..45f32cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -300,7 +300,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart partitionVectors.add(w.getValueVector()); } } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { - kill(); + kill(false); logger.error("Failure while building final partition table.", ex); context.fail(ex); return false; @@ -419,8 +419,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -441,7 +441,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Must set up a new schema each time, because ValueVectors are not reused between containers in queue setupNewSchema(vc); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -474,7 +474,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { setupNewSchema(vc); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -504,7 +504,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { setupNewSchema(incoming); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java new file mode 100644 index 0000000..71a1590 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.partitionsender; + + +public interface PartitionOutgoingBatch { + + public long getTotalRecords(); + + public void terminate(); +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 69be256..14cf092 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.MetricValue; -import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.CopyUtil; @@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec { private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; + + private final AtomicIntegerArray remainingReceivers; + private final AtomicInteger remaingReceiverCount; + private volatile boolean done = false; long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; @@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec { this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); + this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); + this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); + } + + private boolean done() { + for (int i = 0; i < remainingReceivers.length(); i++) { + if (remainingReceivers.get(i) == 0) { + return false; + } + } + return true; } @Override @@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = next(incoming); + IterOutcome out; + if (!done) { + out = next(incoming); + } else { + incoming.kill(true); + out = IterOutcome.NONE; + } logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ @@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec { sendEmptyBatch(); } } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); } @@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec { } createPartitioner(); } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while flushing outgoing batches", e); context.fail(e); return false; } catch (SchemaChangeException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while setting up partitioner", e); context.fail(e); return false; @@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.partitionBatch(incoming); } catch (IOException e) { context.fail(e); - incoming.kill(); + incoming.kill(false); return false; } for (VectorWrapper<?> v : incoming) { @@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void updateStats(List<? extends PartitionStatsBatch> outgoing) { + public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) { long records = 0; - for (PartitionStatsBatch o : outgoing) { + for (PartitionOutgoingBatch o : outgoing) { long totalRecords = o.getTotalRecords(); minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); @@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount); stats.setLongStat(Metric.N_RECEIVERS, outgoing.size()); } + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + int id = handle.getMinorFragmentId(); + if (remainingReceivers.compareAndSet(id, 0, 1)) { + partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate(); + int remaining = remaingReceiverCount.decrementAndGet(); + if (remaining == 0) { + done = true; + } + } + } public void stop() { logger.debug("Partition sender stopping."); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java deleted file mode 100644 index 85ccffb..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.partitionsender; - - -public interface PartitionStatsBatch { - - public long getTotalRecords(); -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 53528ba..c5fe154 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; import java.io.IOException; import java.util.List; @@ -44,7 +43,7 @@ public interface Partitioner { public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; public abstract void initialize(); public abstract void clear(); - public abstract List<? extends PartitionStatsBatch> getOutgoingBatches(); + public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches(); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index fcbd954..3141aed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override - public List<? extends PartitionStatsBatch> getOutgoingBatches() { + public List<? extends PartitionOutgoingBatch> getOutgoingBatches() { return outgoingBatches; } @@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; public abstract int doEval(@Named("inIndex") int inIndex); - public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible { + public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible { private final DataTunnel tunnel; private final HashPartitionSender operator; @@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner { private final int oppositeMinorFragmentId; private boolean isLast = false; + private volatile boolean terminated = false; + private boolean dropAll = false; private BatchSchema outSchema; private int recordCount; private int totalRecords; @@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner { return false; } + @Override + public void terminate() { + terminated = true; + } + @RuntimeOverridden protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {}; @@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner { protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; }; public void flush() throws IOException { - final ExecProtos.FragmentHandle handle = context.getHandle(); + if (dropAll) { + vectorContainer.zeroVectors(); + return; + } + final FragmentHandle handle = context.getHandle(); - if (recordCount != 0) { + if (recordCount != 0 && !terminated) { for(VectorWrapper<?> w : vectorContainer){ w.getValueVector().getMutator().setValueCount(recordCount); @@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner { this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); - if (isLast) { + if (isLast || terminated) { // send final (empty) batch - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + FragmentWritableBatch writableBatch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), @@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner { stats.stopWait(); } this.sendCount.increment(); - vectorContainer.clear(); + vectorContainer.zeroVectors(); + dropAll = true; return; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 2dae502..f091aa9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -182,7 +182,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { producer.interrupt(); stop = true; try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index f21673d..dbb547d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -74,11 +74,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - public void kill() { - incoming.kill(); - } - - @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); } @@ -148,7 +143,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -209,8 +204,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index ddee38a..1f2f843 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -59,21 +59,21 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { } @Override - public void kill() { + public void kill(boolean sendUpstream) { if(current != null){ - current.kill(); + current.kill(sendUpstream); current = null; } for(;incomingIterator.hasNext();){ - incomingIterator.next().kill(); + incomingIterator.next().kill(sendUpstream); } } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { for (int i = 0; i < incoming.size(); i++) { RecordBatch in = incoming.get(i); - in.kill(); + in.kill(sendUpstream); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 79669fa..16a68b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver; import java.io.IOException; import java.util.Iterator; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; @@ -28,6 +29,9 @@ import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RawFragmentBatch; @@ -40,6 +44,9 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.control.ControlTunnel.ReceiverFinished; public class UnorderedReceiverBatch implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); @@ -50,6 +57,7 @@ public class UnorderedReceiverBatch implements RecordBatch { private BatchSchema schema; private OperatorStats stats; private boolean first = true; + private UnorderedReceiver config; public enum Metric implements MetricDef { BYTES_RECEIVED, @@ -70,6 +78,7 @@ public class UnorderedReceiverBatch implements RecordBatch { this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); + this.config = config; } @Override @@ -88,8 +97,12 @@ public class UnorderedReceiverBatch implements RecordBatch { } @Override - public void kill() { - fragProvider.kill(context); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + informSenders(); + } else { + fragProvider.kill(context); + } } @Override @@ -188,4 +201,34 @@ public class UnorderedReceiverBatch implements RecordBatch { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + private void informSenders() { + FragmentHandle handlePrototype = FragmentHandle.newBuilder() + .setMajorFragmentId(config.getOppositeMajorFragmentId()) + .setQueryId(context.getHandle().getQueryId()) + .build(); + for (int i = 0; i < config.getNumSenders(); i++) { + FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) + .setMinorFragmentId(i) + .build(); + FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() + .setReceiver(context.getHandle()) + .setSender(sender) + .build(); + context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + } + } + + private class OutcomeListener implements RpcOutcomeListener<Ack> { + + @Override + public void failed(RpcException ex) { + logger.warn("Failed to inform upstream that receiver is finished"); + } + + @Override + public void success(Ack value, ByteBuf buffer) { + // Do nothing + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 20e4de4..14110e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -82,8 +82,8 @@ public class IteratorValidatorBatchIterator implements RecordBatch { } @Override - public void kill() { - incoming.kill(); + public void kill(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 08219a1..d4b1001 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -132,8 +132,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - public void kill() { - incoming.kill(); + public void kill(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -324,7 +324,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -577,8 +577,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index fd584cb..9313018 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -44,7 +44,7 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true); public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000); public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d); - public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true); + public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 088b120..e8ad311 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -106,11 +106,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } @Override - public void kill() { - killIncoming(); + public void kill(boolean sendUpstream) { + killIncoming(sendUpstream); } - protected abstract void killIncoming(); + protected abstract void killIncoming(boolean sendUpstream); public void cleanup(){ container.clear(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 721755d..bea7bbf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -36,8 +36,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte stats.startSetup(); setupNewSchema(); }catch(SchemaChangeException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 7617d91..9b28179 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -79,7 +79,7 @@ public interface RecordBatch extends VectorAccessible { * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine * what has happened. */ - public void kill(); + public void kill(boolean sendUpstream); public abstract SelectionVector2 getSelectionVector2(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index b398e47..9953e5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -37,6 +38,7 @@ public class ControlRpcConfig { .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .build(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 9a26039..d035c10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.control; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -56,6 +57,11 @@ public class ControlTunnel { manager.runCommand(b); } + public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){ + ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver); + manager.runCommand(b); + } + public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){ SendFragmentStatus b = new SendFragmentStatus(status); manager.runCommand(b); @@ -84,6 +90,21 @@ public class ControlTunnel { } + + public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> { + final FinishedReceiver finishedReceiver; + + public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) { + super(listener); + this.finishedReceiver = finishedReceiver; + } + + @Override + public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { + connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class); + } + } + public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> { final FragmentHandle handle; @@ -127,5 +148,4 @@ public class ControlTunnel { connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); } } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index afd3fa2..893aec8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -74,6 +75,11 @@ public class ControlHandlerImpl implements ControlMessageHandler { cancelFragment(handle); return DataRpcConfig.OK; + case RpcType.REQ_RECEIVER_FINISHED_VALUE: + FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER); + receivingFragmentFinished(finishedReceiver); + return DataRpcConfig.OK; + case RpcType.REQ_FRAGMENT_STATUS_VALUE: bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER)); // TODO: Support a type of message that has no response. @@ -159,6 +165,22 @@ public class ControlHandlerImpl implements ControlMessageHandler { return Acks.OK; } + public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) { + FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); + + FragmentExecutor executor; + if(manager != null) { + executor = manager.getRunnable(); + } else { + // then try local cancel. + executor = bee.getFragmentRunner(finishedReceiver.getSender()); + } + if (executor != null) { + executor.receivingFragmentFinished(finishedReceiver.getReceiver()); + } + + return Acks.OK; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 735e663..c5c08e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -71,6 +72,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } + public void receivingFragmentFinished(FragmentHandle handle) { + root.receivingFragmentFinished(handle); + } + public UserClientConnection getClient(){ return context.getConnection(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c331aed8/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index 7dce6e0..db8ff8e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; @@ -85,6 +86,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ } @Override + public void receivingFragmentFinished(FragmentHandle handle) { + //no op + } + + @Override public Iterator<ValueVector> iterator() { List<ValueVector> vv = Lists.newArrayList(); for(VectorWrapper<?> vw : incoming){
