DRILL-1382: Fast schema return
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/63d3008e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/63d3008e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/63d3008e Branch: refs/heads/master Commit: 63d3008e120a12d9167208a6db0faa950f9a618b Parents: 451dd60 Author: Steven Phillips <[email protected]> Authored: Fri Oct 10 17:03:19 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Mon Oct 27 06:37:26 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/memory/Accountor.java | 13 +- .../drill/exec/physical/impl/RootExec.java | 6 + .../drill/exec/physical/impl/ScanBatch.java | 44 ++++-- .../drill/exec/physical/impl/ScreenCreator.java | 43 ++++-- .../exec/physical/impl/SingleSenderCreator.java | 31 ++++ .../impl/TopN/PriorityQueueTemplate.java | 1 + .../exec/physical/impl/TopN/TopNBatch.java | 47 ++++-- .../exec/physical/impl/WriterRecordBatch.java | 53 ++++--- .../physical/impl/aggregate/HashAggBatch.java | 44 ++++-- .../impl/aggregate/StreamingAggBatch.java | 24 +++- .../impl/aggregate/StreamingAggTemplate.java | 2 +- .../BroadcastSenderRootExec.java | 29 ++++ .../exec/physical/impl/common/HashTable.java | 2 + .../physical/impl/common/HashTableTemplate.java | 9 +- .../physical/impl/filter/FilterRecordBatch.java | 17 ++- .../exec/physical/impl/join/HashJoinBatch.java | 85 ++++++----- .../impl/join/HashJoinProbeTemplate.java | 14 +- .../exec/physical/impl/join/JoinStatus.java | 8 ++ .../exec/physical/impl/join/MergeJoinBatch.java | 144 ++++++++++--------- .../physical/impl/limit/LimitRecordBatch.java | 38 ++--- .../impl/mergereceiver/MergingRecordBatch.java | 42 +++--- .../PartitionSenderRootExec.java | 21 +++ .../partitionsender/PartitionerTemplate.java | 12 +- .../impl/producer/ProducerConsumerBatch.java | 25 ++++ .../impl/project/ProjectRecordBatch.java | 53 +++++-- .../impl/svremover/RemovingRecordBatch.java | 36 +++-- .../physical/impl/trace/TraceRecordBatch.java | 4 +- .../impl/union/UnionAllRecordBatch.java | 15 ++ .../UnorderedReceiverBatch.java | 5 + .../IteratorValidatorBatchIterator.java | 19 ++- .../window/StreamingWindowFrameRecordBatch.java | 17 ++- .../physical/impl/xsort/ExternalSortBatch.java | 34 ++++- .../drill/exec/record/AbstractRecordBatch.java | 6 + .../exec/record/AbstractSingleRecordBatch.java | 16 ++- .../exec/record/ExpandableHyperContainer.java | 14 ++ .../exec/record/FragmentWritableBatch.java | 11 ++ .../apache/drill/exec/record/RecordBatch.java | 9 ++ .../drill/exec/record/VectorContainer.java | 56 +++++++- .../exec/work/fragment/FragmentExecutor.java | 1 + .../java/org/apache/drill/BaseTestQuery.java | 4 +- .../org/apache/drill/TestExampleQueries.java | 2 +- .../exec/fn/impl/TestAggregateFunction.java | 2 +- .../drill/exec/fn/impl/TestDateFunctions.java | 2 +- .../drill/exec/fn/impl/TestMultiInputAdd.java | 2 +- .../exec/fn/impl/TestNewAggregateFunctions.java | 2 +- .../exec/fn/impl/TestNewMathFunctions.java | 3 +- .../exec/physical/impl/SimpleRootExec.java | 15 ++ .../exec/physical/impl/TestCastFunctions.java | 2 +- .../physical/impl/TestCastVarCharToBigInt.java | 2 +- .../drill/exec/physical/impl/TestDecimal.java | 12 +- .../physical/impl/TestExtractFunctions.java | 2 +- .../impl/TestImplicitCastFunctions.java | 1 + .../physical/impl/TestSimpleFragmentRun.java | 8 +- .../exec/physical/impl/TestStringFunctions.java | 1 + .../exec/physical/impl/TopN/TestSimpleTopN.java | 2 +- .../exec/physical/impl/join/TestHashJoin.java | 4 +- .../exec/physical/impl/sort/TestSimpleSort.java | 2 + .../physical/impl/window/TestWindowFrame.java | 12 +- .../exec/physical/impl/writer/TestWriter.java | 2 +- .../drill/exec/record/vector/TestDateTypes.java | 12 +- .../vector/complex/writer/TestJsonReader.java | 8 +- .../java-exec/src/test/resources/agg/test1.json | 2 +- .../src/test/resources/agg/twokey.json | 2 +- .../decimal/test_decimal_sort_complex.json | 2 +- .../functions/cast/testICastConstant.json | 4 +- .../functions/date/interval_arithmetic.json | 2 +- .../functions/string/testRegexpReplace.json | 4 +- .../src/test/resources/join/join_batchsize.json | 4 +- .../test/resources/join/mj_multi_condition.json | 4 +- .../resources/mergerecv/merging_receiver.json | 2 +- .../resources/mergerecv/multiple_providers.json | 2 +- .../resources/record/vector/test_sort_date.json | 2 +- .../src/test/resources/window/oneKeyCount.json | 2 +- .../resources/window/oneKeyCountMultiBatch.json | 2 +- .../src/test/resources/window/twoKeys.json | 2 +- .../apache/drill/jdbc/test/TestJdbcQuery.java | 1 + 76 files changed, 841 insertions(+), 348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 67beb95..a86367f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -356,13 +356,14 @@ public class Accountor { sb.append("at stack location:\n"); entry.addToString(sb); } - IllegalStateException e = new IllegalStateException(sb.toString()); - if (errorOnLeak) { - throw e; - } else { - logger.warn("Memory leaked.", e); + if (!buffers.isEmpty()) { + IllegalStateException e = new IllegalStateException(sb.toString()); + if (errorOnLeak) { + throw e; + } else { + logger.warn("Memory leaked.", e); + } } - } remainder.close(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index 4250e27..d9c4e5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; /** @@ -28,6 +29,11 @@ public interface RootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class); /** + * Generate and send emtpy schema batch + */ + public void buildSchema() throws SchemaChangeException; + + /** * Do the next batch of work. * @return Whether or not additional batches of work are necessary. False means that this fragment is done. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index fc23441..ac65e40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -65,6 +65,7 @@ public class ScanBatch implements RecordBatch { private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap(); private final VectorContainer container = new VectorContainer(); + private VectorContainer tempContainer; private int recordCount; private final FragmentContext context; private final OperatorContext oContext; @@ -77,7 +78,7 @@ public class ScanBatch implements RecordBatch { private List<ValueVector> partitionVectors; private List<Integer> selectedPartitionColumns; private String partitionColumnDesignator; - private boolean first = true; + private boolean first = false; private boolean done = false; public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { @@ -117,6 +118,22 @@ public class ScanBatch implements RecordBatch { } @Override + public IterOutcome buildSchema() { + IterOutcome outcome = next(); + if (outcome == IterOutcome.NONE) { + container.buildSchema(SelectionVectorMode.NONE); + schema = container.getSchema(); + done = true; + } + first = true; + tempContainer = VectorContainer.getTransferClone(container); + for (VectorWrapper w : container) { + w.getValueVector().allocateNew(); + } + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override public int getRecordCount() { return recordCount; } @@ -138,11 +155,24 @@ public class ScanBatch implements RecordBatch { container.zeroVectors(); } + private void transfer() { + container.zeroVectors(); + for (VectorWrapper w : tempContainer) { + MaterializedField field = w.getField(); + w.getValueVector().makeTransferPair(container.addOrGet(field)).transfer(); + } + } + @Override public IterOutcome next() { if (done) { return IterOutcome.NONE; } + if (first) { + first = false; + transfer(); + return IterOutcome.OK; + } long t1 = System.nanoTime(); oContext.getStats().startProcessing(); try { @@ -159,14 +189,6 @@ public class ScanBatch implements RecordBatch { try { if (!readers.hasNext()) { currentReader.cleanup(); - if (first) { - first = false; - done = true; - populatePartitionVectors(); - container.buildSchema(SelectionVectorMode.NONE); - schema = container.getSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } releaseAssets(); return IterOutcome.NONE; } @@ -196,7 +218,6 @@ public class ScanBatch implements RecordBatch { return IterOutcome.STOP; } } - first = false; populatePartitionVectors(); if (mutator.isNewSchema()) { @@ -349,6 +370,9 @@ public class ScanBatch implements RecordBatch { public void cleanup() { container.clear(); + if (tempContainer != null) { + tempContainer.clear(); + } for (ValueVector v : partitionVectors) { v.clear(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 868eb6e..3a843ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -85,6 +86,30 @@ public class ScreenCreator implements RootCreator<Screen>{ this.connection = context.getConnection(); } + @Override + public void buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + + QueryWritableBatch batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, false, incoming.getSchema()); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } + sendCount.increment(); + } finally { + stats.stopProcessing(); + } + materializer = new VectorRecordMaterializer(context, incoming); + } @Override public boolean innerNext() { @@ -123,17 +148,13 @@ public class ScreenCreator implements RootCreator<Screen>{ case NONE: { this.internalStop(); QueryWritableBatch batch; - if (!first) { - QueryResult header = QueryResult.newBuilder() // - .setQueryId(context.getHandle().getQueryId()) // - .setRowCount(0) // - .setDef(RecordBatchDef.getDefaultInstance()) // - .setIsLastChunk(true) // - .build(); - batch = new QueryWritableBatch(header); - } else { - batch = QueryWritableBatch.getEmptyBatchWithSchema(context.getHandle().getQueryId(), 0, true, incoming.getSchema()); - } + QueryResult header = QueryResult.newBuilder() // + .setQueryId(context.getHandle().getQueryId()) // + .setRowCount(0) // + .setDef(RecordBatchDef.getDefaultInstance()) // + .setIsLastChunk(true) // + .build(); + batch = new QueryWritableBatch(header); stats.startWait(); try { connection.sendResult(listener, batch); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 34196b7..b638de0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,11 +22,14 @@ import io.netty.buffer.ByteBuf; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.SingleSender; +import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; +import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; @@ -52,6 +55,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ private RecordBatch incoming; private DataTunnel tunnel; private FragmentHandle handle; + private SingleSender config; private int recMajor; private FragmentContext context; private volatile boolean ok = true; @@ -73,6 +77,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ this.incoming = batch; assert(incoming != null); this.handle = context.getHandle(); + this.config = config; this.recMajor = config.getOppositeMajorFragmentId(); FragmentHandle opposite = handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(0).build(); this.tunnel = context.getDataTunnel(config.getDestination(), opposite); @@ -80,6 +85,32 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } @Override + public void buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + + FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(), + handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema()); + + stats.startWait(); + try { + tunnel.sendRecordBatch(new RecordSendFailure(), batch); + } finally { + stats.stopWait(); + } + sendCount.increment(); + } finally { + stats.stopProcessing(); + } + } + + @Override public boolean innerNext() { if (!ok) { incoming.kill(false); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index e0a4c92..369c0ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -77,6 +77,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { heapSv4.set(i, v4.get(i)); } v4.clear(); + doSetup(context, hyperBatch, null); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 473e3a3..400a867 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -110,15 +110,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public BatchSchema getSchema() { - List<MaterializedField> fields = Lists.newArrayList(); - for (MaterializedField field : incoming.getSchema()) { - fields.add(field); - } - return BatchSchema.newBuilder().addFields(fields).setSelectionVectorMode(SelectionVectorMode.FOUR_BYTE).build(); - } - - @Override public void cleanup() { if (sv4 != null) { sv4.clear(); @@ -131,6 +122,32 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + VectorContainer c = new VectorContainer(oContext); + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + for (VectorWrapper w : incoming) { + c.addOrGet(w.getField()); + } + c = VectorContainer.canonicalize(c); + for (VectorWrapper w : c) { + container.add(w.getValueVector()); + } + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(0); + return IterOutcome.OK_NEW_SCHEMA; + } finally { + stats.stopProcessing(); + } + } + + @Override public IterOutcome innerNext() { if (schema != null) { if (getSelectionVector4().next()) { @@ -146,6 +163,10 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { Stopwatch watch = new Stopwatch(); watch.start(); IterOutcome upstream = incoming.next(); + if (upstream == IterOutcome.OK && schema == null) { + upstream = IterOutcome.OK_NEW_SCHEMA; + container.clear(); + } logger.debug("Took {} us to get next", watch.elapsed(TimeUnit.MICROSECONDS)); switch (upstream) { case NONE: @@ -191,6 +212,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { priorityQueue.generate(); this.sv4 = priorityQueue.getFinalSv4(); + container.clear(); for (VectorWrapper w : priorityQueue.getHyperBatch()) { container.add(w.getValueVectors()); } @@ -210,7 +232,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { Stopwatch watch = new Stopwatch(); watch.start(); VectorContainer c = priorityQueue.getHyperBatch(); - VectorContainer newContainer = new VectorContainer(); + VectorContainer newContainer = new VectorContainer(oContext); SelectionVector4 selectionVector4 = priorityQueue.getHeapSv4(); SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context); SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context); @@ -323,6 +345,11 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + return null; + } + + @Override public int getRecordCount() { if (sv4 != null) { return sv4.getCount(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 8c1a4c0..acbb815 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -30,6 +31,7 @@ import org.apache.drill.exec.physical.base.Writer; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorWrapper; @@ -47,7 +49,6 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { private RecordWriter recordWriter; private int counter = 0; private final RecordBatch incoming; - private boolean first = true; private boolean processed = false; private String fragmentUniqueId; @@ -71,8 +72,23 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + incoming.buildSchema(); + try { + stats.startProcessing(); + setupNewSchema(); + } catch (Exception e) { + throw new SchemaChangeException(e); + } finally { + stats.stopProcessing(); + } + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override public IterOutcome innerNext() { if(processed) { + cleanup(); // if the upstream record batch is already processed and next() is called by // downstream then return NONE to indicate completion return IterOutcome.NONE; @@ -82,16 +98,11 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { IterOutcome upstream; do { upstream = next(incoming); - if(first && upstream == IterOutcome.OK) { - upstream = IterOutcome.OK_NEW_SCHEMA; - } - first = false; switch(upstream) { case NOT_YET: case NONE: case STOP: - cleanup(); if (upstream == IterOutcome.STOP) { return upstream; } @@ -125,22 +136,12 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } } while(upstream != IterOutcome.NONE); - // Create two vectors for: - // 1. Fragment unique id. - // 2. Summary: currently contains number of records written. - MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); - MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT)); - - VarCharVector fragmentIdVector = (VarCharVector) TypeHelper.getNewVector(fragmentIdField, context.getAllocator()); - AllocationHelper.allocate(fragmentIdVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); - BigIntVector summaryVector = (BigIntVector) TypeHelper.getNewVector(summaryField, context.getAllocator()); - AllocationHelper.allocate(summaryVector, 1, TypeHelper.getSize(Types.required(MinorType.VARCHAR))); - - - container.add(fragmentIdVector); - container.add(summaryVector); - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById(VarCharVector.class, container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()).getValueVector(); + AllocationHelper.allocate(fragmentIdVector, 1, 50); + BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class, + container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()).getValueVector(); + AllocationHelper.allocate(summaryVector, 1, 8); fragmentIdVector.getMutator().setSafe(0, fragmentUniqueId.getBytes()); fragmentIdVector.getMutator().setValueCount(1); summaryVector.getMutator().setSafe(0, counter); @@ -157,6 +158,15 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { // update the schema in RecordWriter stats.startSetup(); recordWriter.updateSchema(incoming.getSchema()); + // Create two vectors for: + // 1. Fragment unique id. + // 2. Summary: currently contains number of records written. + MaterializedField fragmentIdField = MaterializedField.create(SchemaPath.getSimplePath("Fragment"), Types.required(MinorType.VARCHAR)); + MaterializedField summaryField = MaterializedField.create(SchemaPath.getSimplePath("Number of records written"), Types.required(MinorType.BIGINT)); + + container.addOrGet(fragmentIdField); + container.addOrGet(summaryField); + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); } catch(IOException ex) { throw new RuntimeException("Failed to update schema in RecordWriter", ex); } finally{ @@ -164,6 +174,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } eventBasedRecordWriter = new EventBasedRecordWriter(incoming, recordWriter); + container.buildSchema(SelectionVectorMode.NONE); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index c522870..a0b8d3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -89,17 +89,45 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + if (!createAggregator()) { + done = true; + return IterOutcome.STOP; + } + return IterOutcome.OK_NEW_SCHEMA; + } finally { + stats.stopProcessing(); + } + } + + @Override public IterOutcome innerNext() { if (done) { return IterOutcome.NONE; } // this is only called on the first batch. Beyond this, the aggregator manages batches. - if (aggregator == null) { + if (aggregator == null || first) { + first = false; + if (aggregator != null) { + aggregator.cleanup(); + } IterOutcome outcome = next(incoming); + if (outcome == IterOutcome.OK) { + outcome = IterOutcome.OK_NEW_SCHEMA; + } logger.debug("Next outcome of {}", outcome); switch (outcome) { case NONE: - throw new UnsupportedOperationException("Received NONE on first batch"); +// throw new UnsupportedOperationException("Received NONE on first batch"); + return outcome; case NOT_YET: case STOP: return outcome; @@ -110,7 +138,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } break; case OK: - throw new IllegalStateException("You should never get a first batch without a new schema"); + break; default: throw new IllegalStateException(String.format("unknown outcome %s", outcome)); } @@ -123,11 +151,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { if (aggregator.buildComplete() && ! aggregator.allFlushed()) { // aggregation is complete and not all records have been output yet IterOutcome outcome = aggregator.outputCurrentBatch(); - if (outcome == IterOutcome.NONE && first) { - first = false; - done = true; - return IterOutcome.OK_NEW_SCHEMA; - } return outcome; } @@ -144,11 +167,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { // fall through case RETURN_OUTCOME: IterOutcome outcome = aggregator.getOutcome(); - if (outcome == IterOutcome.NONE && first) { - first = false; - done = true; - return IterOutcome.OK_NEW_SCHEMA; - } return aggregator.getOutcome(); case UPDATE_AGGREGATOR: aggregator = null; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 4d3925e..17aaae8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -77,13 +77,33 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + if (!createAggregator()) { + done = true; + return IterOutcome.STOP; + } + return IterOutcome.OK_NEW_SCHEMA; + } finally { + stats.stopProcessing(); + } + } + @Override public IterOutcome innerNext() { if (done) { container.zeroVectors(); return IterOutcome.NONE; } // this is only called on the first batch. Beyond this, the aggregator manages batches. - if (aggregator == null) { + if (aggregator == null || first) { + first = false; IterOutcome outcome = next(incoming); logger.debug("Next outcome of {}", outcome); switch (outcome) { @@ -98,7 +118,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } break; case OK: - throw new IllegalStateException("You should never get a first batch without a new schema"); + break; default: throw new IllegalStateException(String.format("unknown outcome %s", outcome)); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index c2a5715..556b260 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -55,7 +55,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { this.schema = incoming.getSchema(); this.outgoing = outgoing; setupInterior(incoming, outgoing); - this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); } @@ -92,6 +91,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { // if we're in the first state, allocate outgoing. if (first) { + this.currentIndex = incoming.getRecordCount() == 0 ? 0 : this.getVectorIndex(underlyingIndex); allocateOutgoing(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 3c8e551..4e7d222 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import java.util.List; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -84,6 +85,34 @@ public class BroadcastSenderRootExec extends BaseRootExec { } @Override + public void buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + + FragmentWritableBatch batch = FragmentWritableBatch.getEmptyBatchWithSchema(handle.getQueryId(), + handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), 0, incoming.getSchema()); + + stats.startWait(); + for (int i = 0; i < tunnels.length; i++) { + try { + tunnels[i].sendRecordBatch(this.statusHandler, batch); + } finally { + stats.stopWait(); + } + statusHandler.sendCount.increment(); + } + } finally { + stats.stopProcessing(); + } + } + + @Override public boolean innerNext() { if(!ok) { context.fail(statusHandler.ex); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 6028a04..e8ccd62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -46,6 +46,8 @@ public interface HashTable { RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); + public void updateBatches(); + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount); public int containsKey(int incomingRowIdx, boolean isProbe); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 6024523..5b56f8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -144,7 +144,7 @@ public abstract class HashTableTemplate implements HashTable { hashValues.getMutator().setValueCount(size); } - private void setup() { + protected void setup() { setupInterior(incomingBuild, incomingProbe, outgoing, htContainer); } @@ -433,6 +433,13 @@ public abstract class HashTableTemplate implements HashTable { currentIdxHolder = new IndexPointer(); } + public void updateBatches() { + doSetup(incomingBuild, incomingProbe); + for (BatchHolder batchHolder : batchHolders) { + batchHolder.setup(); + } + } + public int numBuckets() { return startIndices.getAccessor().getValueCount(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 85f664c..7d68e07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -77,6 +77,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override protected IterOutcome doWork() { + container.zeroVectors(); int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); // for (VectorWrapper<?> v : container) { @@ -100,15 +101,16 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } @Override - protected void setupNewSchema() throws SchemaChangeException { - container.clear(); + protected boolean setupNewSchema() throws SchemaChangeException { if (sv2 != null) { sv2.clear(); } switch (incoming.getSchema().getSelectionVectorMode()) { case NONE: - sv2 = new SelectionVector2(oContext.getAllocator()); + if (sv2 == null) { + sv2 = new SelectionVector2(oContext.getAllocator()); + } this.filter = generateSV2Filterer(); break; case TWO_BYTE: @@ -135,6 +137,11 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ throw new UnsupportedOperationException(); } + if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.TWO_BYTE); + return true; + } + return false; } protected Filterer generateSV4Filterer() throws SchemaChangeException { @@ -190,12 +197,10 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ cg.addExpr(new ReturnValueExpression(expr)); for (VectorWrapper<?> v : incoming) { - TransferPair pair = v.getValueVector().getTransferPair(); - container.add(pair.getTo()); + TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField())); transfers.add(pair); } - container.buildSchema(SelectionVectorMode.TWO_BYTE); try { TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2a08c05..238c992 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -33,7 +33,6 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -46,12 +45,13 @@ import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.ExpandableHyperContainer; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.vector.ValueVector; import org.eigenbase.rel.JoinRelType; import com.sun.codemodel.JExpr; @@ -167,6 +167,35 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } + + @Override + public IterOutcome buildSchema() throws SchemaChangeException { + leftUpstream = left.buildSchema(); + right.buildSchema(); + // Initialize the hash join helper context + hjHelper = new HashJoinHelper(context, oContext.getAllocator()); + try { + rightSchema = right.getSchema(); + VectorContainer c = new VectorContainer(oContext); + for (MaterializedField field : rightSchema) { + c.addOrGet(field); + } + c.buildSchema(SelectionVectorMode.NONE); + c.setRecordCount(0); + hyperContainer = new ExpandableHyperContainer(c); + hjHelper.addNewBatch(0); + buildBatchIndex++; + setupHashTable(); + hashJoinProbe = setupHashJoinProbe(); + // Build the container schema and set the counts + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setRecordCount(outputRecords); + } catch (IOException | ClassTransformationException e) { + throw new SchemaChangeException(e); + } + return IterOutcome.OK_NEW_SCHEMA; + } + @Override public IterOutcome innerNext() { if (done) { @@ -176,27 +205,15 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side */ - if (hashJoinProbe == null) { - - // Initialize the hash join helper context - hjHelper = new HashJoinHelper(context, oContext.getAllocator()); - - /* Build phase requires setting up the hash table. Hash table will - * materialize both the build and probe side expressions while - * creating the hash table. So we need to invoke next() on our probe batch - * as well, for the materialization to be successful. This batch will not be used - * till we complete the build phase. - */ - leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); - + if (first) { + first = false; // Build the hash table, using the build side record batches. executeBuildPhase(); +// IterOutcome next = next(HashJoinHelper.LEFT_INPUT, left); + hashJoinProbe.setupHashJoinProbe(context, hyperContainer, left, 0, this, hashTable, hjHelper, joinType); // Update the hash table related stats for the operator updateStats(this.hashTable); - - // Create the run time generated code needed to probe and project - hashJoinProbe = setupHashJoinProbe(); } // Store the number of records projected @@ -216,21 +233,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { if (outputRecords > 0 || first) { first = false; - // Build the container schema and set the counts - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - container.setRecordCount(outputRecords); for (VectorWrapper<?> v : container) { v.getValueVector().getMutator().setValueCount(outputRecords); } - // First output batch, return OK_NEW_SCHEMA - if (firstOutputBatch == true) { - firstOutputBatch = false; - return IterOutcome.OK_NEW_SCHEMA; - } - - // Not the first output batch return IterOutcome.OK; } } else { @@ -302,6 +309,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { //Setup the underlying hash table IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right); + if (hashTable == null) { + rightUpstream = IterOutcome.OK_NEW_SCHEMA; + } boolean moreData = true; @@ -324,7 +334,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } setupHashTable(); } else { + if (!rightSchema.equals(right.getSchema())) { throw new SchemaChangeException("Hash join does not support schema changes"); + } + hashTable.updateBatches(); } // Fall through case OK: @@ -388,10 +401,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { JExpression outIndex = JExpr.direct("outIndex"); g.rotateBlock(); - if (hyperContainer != null) { - for(VectorWrapper<?> vv : hyperContainer) { + if (rightSchema != null) { + for(MaterializedField field : rightSchema) { - MajorType inputType = vv.getField().getType(); + MajorType inputType = field.getType(); MajorType outputType; if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) { outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); @@ -400,10 +413,10 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } // Add the vector to our output container - ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator()); - container.add(v); +// ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator()); + container.addOrGet(MaterializedField.create(field.getPath(), outputType)); - JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId)); + JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(field.getType(), true, fieldId)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId)); g.getEvalBlock()._if(outVV.invoke("copyFromSafe") .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) @@ -435,8 +448,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { outputType = inputType; } - ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator()); - container.add(v); + container.addOrGet(MaterializedField.create(vv.getField().getPath(), outputType)); JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId)); @@ -453,7 +465,6 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { HashJoinProbe hj = context.getImplementationClass(cg); - hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType); return hj; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 133289e..c58f9a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.common.HashTable; +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorContainer; @@ -37,6 +38,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { // Probe side record batch private RecordBatch probeBatch; + private BatchSchema probeSchema; + + private VectorContainer buildBatch; + // Join type, INNER, LEFT, RIGHT or OUTER private JoinRelType joinType; @@ -81,6 +86,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { HashJoinHelper hjHelper, JoinRelType joinRelType) { this.probeBatch = probeBatch; + this.probeSchema = probeBatch.getSchema(); + this.buildBatch = buildBatch; this.joinType = joinRelType; this.recordsToProcess = probeRecordCount; this.hashTable = hashTable; @@ -135,7 +142,12 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { continue; case OK_NEW_SCHEMA: - throw new SchemaChangeException("Hash join does not support schema changes"); + if (probeBatch.getSchema().equals(probeSchema)) { + doSetup(outgoingJoinBatch.getContext(), buildBatch, probeBatch, outgoingJoinBatch); + hashTable.updateBatches(); + } else { + throw new SchemaChangeException("Hash join does not support schema changes"); + } case OK: recordsToProcess = probeBatch.getRecordCount(); recordsProcessed = 0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java index 39bdb94..3bc8daa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinStatus.java @@ -219,6 +219,14 @@ public final class JoinStatus { return leftPosition + 1 < left.getRecordCount(); } + public IterOutcome getLastRight() { + return lastRight; + } + + public IterOutcome getLastLeft() { + return lastLeft; + } + /** * Check if the next left record position can advance by one in the current batch. */ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 1d4e353..518971d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -51,6 +51,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.JoinRelType; @@ -136,6 +137,19 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + left.buildSchema(); + right.buildSchema(); + try { + allocateBatch(true); + worker = generateNewWorker(); + } catch (IOException | ClassTransformationException e) { + throw new SchemaChangeException(e); + } + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override public IterOutcome innerNext() { if (done) { return IterOutcome.NONE; @@ -148,9 +162,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { JoinOutcome outcome = status.getOutcome(); // if the previous outcome was a change in schema or we sent a batch, we have to set up a new batch. - if (outcome == JoinOutcome.BATCH_RETURNED || - outcome == JoinOutcome.SCHEMA_CHANGED) { - allocateBatch(); + if (outcome == JoinOutcome.SCHEMA_CHANGED) { + allocateBatch(true); + } else if (outcome == JoinOutcome.BATCH_RETURNED) { + allocateBatch(false); } // reset the output position to zero after our parent iterates this RecordBatch @@ -388,7 +403,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { cg.setMappingSet(copyRightMappping); int rightVectorBase = vectorId; - if (worker == null || status.isRightPositionAllowed()) { + if (status.getLastRight() != IterOutcome.NONE && (worker == null || status.isRightPositionAllowed())) { for (VectorWrapper<?> vw : right) { MajorType inputType = vw.getField().getType(); MajorType outputType; @@ -418,18 +433,17 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { return w; } - private void allocateBatch() { + private void allocateBatch(boolean newSchema) { // allocate new batch space. - container.clear(); + container.zeroVectors(); //estimation of joinBatchSize : max of left/right size, expanded by a factor of 16, which is then bounded by MAX_BATCH_SIZE. int leftCount = worker == null ? left.getRecordCount() : (status.isLeftPositionAllowed() ? left.getRecordCount() : 0); int rightCount = worker == null ? left.getRecordCount() : (status.isRightPositionAllowed() ? right.getRecordCount() : 0); int joinBatchSize = Math.min(Math.max(leftCount, rightCount) * 16, MAX_BATCH_SIZE); + if (newSchema) { // add fields from both batches - if (worker == null || leftCount > 0) { - for (VectorWrapper<?> w : left) { MajorType inputType = w.getField().getType(); MajorType outputType; @@ -438,13 +452,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { outputType = inputType; } - ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, left.getRecordCount()))).alloc(joinBatchSize); - container.add(outgoingVector); + MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); + container.addOrGet(newField); } - } - if (worker == null || rightCount > 0) { for (VectorWrapper<?> w : right) { MajorType inputType = w.getField().getType(); MajorType outputType; @@ -453,12 +464,15 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { outputType = inputType; } - ValueVector outgoingVector = TypeHelper.getNewVector(MaterializedField.create(w.getField().getPath(), outputType), oContext.getAllocator()); - VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / Math.max(1, right.getRecordCount()))).alloc(joinBatchSize); - container.add(outgoingVector); + MaterializedField newField = MaterializedField.create(w.getField().getPath(), outputType); + container.addOrGet(newField); } } + for (VectorWrapper w : container) { + AllocationHelper.allocate(w.getValueVector(), 5000, 50); + } + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); logger.debug("Built joined schema: {}", container.getSchema()); } @@ -467,58 +481,60 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { JVar incomingLeftRecordBatch, JVar incomingRightRecordBatch, ErrorCollector collector) throws ClassTransformationException { cg.setMappingSet(compareMapping); + if (status.getLastRight() != IterOutcome.NONE) { - for (JoinCondition condition : conditions) { - final LogicalExpression leftFieldExpr = condition.getLeft(); - final LogicalExpression rightFieldExpr = condition.getRight(); + for (JoinCondition condition : conditions) { + final LogicalExpression leftFieldExpr = condition.getLeft(); + final LogicalExpression rightFieldExpr = condition.getRight(); - // materialize value vector readers from join expression - LogicalExpression materializedLeftExpr; - if (worker == null || status.isLeftPositionAllowed()) { - materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); - } else { - materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); - } - if (collector.hasErrors()) { - throw new ClassTransformationException(String.format( - "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); - } - - LogicalExpression materializedRightExpr; - if (worker == null || status.isRightPositionAllowed()) { - materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry()); - } else { - materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); - } - if (collector.hasErrors()) { - throw new ClassTransformationException(String.format( - "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString())); - } - - // generate compare() - //////////////////////// - cg.setMappingSet(compareMapping); - cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch)); - ClassGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false); + // materialize value vector readers from join expression + LogicalExpression materializedLeftExpr; + if (worker == null || status.isLeftPositionAllowed()) { + materializedLeftExpr = ExpressionTreeMaterializer.materialize(leftFieldExpr, left, collector, context.getFunctionRegistry()); + } else { + materializedLeftExpr = new TypedNullConstant(Types.optional(MinorType.INT)); + } + if (collector.hasErrors()) { + throw new ClassTransformationException(String.format( + "Failure while trying to materialize incoming left field. Errors:\n %s.", collector.toErrorString())); + } - cg.setMappingSet(compareRightMapping); - cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch)); - ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false); + LogicalExpression materializedRightExpr; + if (worker == null || status.isRightPositionAllowed()) { + materializedRightExpr = ExpressionTreeMaterializer.materialize(rightFieldExpr, right, collector, context.getFunctionRegistry()); + } else { + materializedRightExpr = new TypedNullConstant(Types.optional(MinorType.INT)); + } + if (collector.hasErrors()) { + throw new ClassTransformationException(String.format( + "Failure while trying to materialize incoming right field. Errors:\n %s.", collector.toErrorString())); + } - LogicalExpression fh = FunctionGenerationHelper.getComparator(compareLeftExprHolder, - compareRightExprHolder, - context.getFunctionRegistry()); - HoldingContainer out = cg.addExpr(fh, false); - - // If not 0, it means not equal. We return this out value. - // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. - if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) { - JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). - cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); - jc._then()._return(JExpr.lit(1)); - jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue()); - } else { - cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue()); + // generate compare() + //////////////////////// + cg.setMappingSet(compareMapping); + cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingLeftRecordBatch)); + ClassGenerator.HoldingContainer compareLeftExprHolder = cg.addExpr(materializedLeftExpr, false); + + cg.setMappingSet(compareRightMapping); + cg.getSetupBlock().assign(JExpr._this().ref(incomingRecordBatch), JExpr._this().ref(incomingRightRecordBatch)); + ClassGenerator.HoldingContainer compareRightExprHolder = cg.addExpr(materializedRightExpr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(compareLeftExprHolder, + compareRightExprHolder, + context.getFunctionRegistry()); + HoldingContainer out = cg.addExpr(fh, false); + + // If not 0, it means not equal. We return this out value. + // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. + if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()) { + JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). + cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); + jc._then()._return(JExpr.lit(1)); + jc._elseif(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue()); + } else { + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(out.getValue()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 8ffd7be..02e1a92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -42,7 +42,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { private int recordsLeft; private boolean noEndLimit; private boolean skipBatch; - private boolean first = true; private boolean done = false; List<TransferPair> transfers = Lists.newArrayList(); @@ -58,14 +57,13 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - protected void setupNewSchema() throws SchemaChangeException { - container.clear(); + protected boolean setupNewSchema() throws SchemaChangeException { + container.zeroVectors(); transfers.clear(); for(VectorWrapper<?> v : incoming){ - TransferPair pair = v.getValueVector().getTransferPair(); - container.add(pair.getTo()); + TransferPair pair = v.getValueVector().makeTransferPair(container.addOrGet(v.getField())); transfers.add(pair); } @@ -81,8 +79,12 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { throw new UnsupportedOperationException(); } - container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); + if (container.isSchemaChanged()) { + container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); + return true; + } + return false; } @Override @@ -92,10 +94,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } if(!noEndLimit && recordsLeft <= 0) { - if (first) { - return produceEmptyFirstBatch(); - } - incoming.kill(true); IterOutcome upStream = incoming.next(); @@ -109,11 +107,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { upStream = incoming.next(); } - first = false; return IterOutcome.NONE; } - - first = false; return super.innerNext(); } @@ -144,23 +139,6 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { return IterOutcome.OK; } - private IterOutcome produceEmptyFirstBatch() { - incoming.next(); - first = false; - done = true; - // Build the container schema and set the count - for (VectorWrapper<?> v : incoming) { - TransferPair pair = v.getValueVector().getTransferPair(); - container.add(pair.getTo()); - transfers.add(pair); - } - container.buildSchema(BatchSchema.SelectionVectorMode.TWO_BYTE); - container.setRecordCount(0); - - incoming.kill(true); - return IterOutcome.OK_NEW_SCHEMA; - } - private void limitWithNoSV(int recordCount) { int offset = Math.max(0, Math.min(recordCount - 1, recordsToSkip)); recordsToSkip -= offset; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index ed49cf1..8da8f96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -50,10 +50,12 @@ import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.ExpandableHyperContainer; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.record.RawFragmentBatchProvider; import org.apache.drill.exec.record.RecordBatch; @@ -125,7 +127,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> //super(config, context); this.fragProviders = fragProviders; this.context = context; - this.outgoingContainer = new VectorContainer(); + this.outgoingContainer = new VectorContainer(oContext); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); this.config = config; } @@ -212,23 +214,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // allocate the incoming record batch loaders senderCount = rawBatches.size(); - if (senderCount == 0) { - if (firstBatch) { - RecordBatchLoader loader = new RecordBatchLoader(oContext.getAllocator()); - try { - loader.load(emptyBatch.getHeader().getDef(), emptyBatch.getBody()); - } catch (SchemaChangeException e) { - throw new RuntimeException(e); - } - for (VectorWrapper w : loader) { - outgoingContainer.add(w.getValueVector()); - } - outgoingContainer.buildSchema(SelectionVectorMode.NONE); - done = true; - return IterOutcome.OK_NEW_SCHEMA; - } - return IterOutcome.NONE; - } incomingBatches = new RawFragmentBatch[senderCount]; batchOffsets = new int[senderCount]; batchLoaders = new RecordBatchLoader[senderCount]; @@ -274,9 +259,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> bldr.addField(v.getField()); // allocate a new value vector - ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), oContext.getAllocator()); + ValueVector outgoingVector = outgoingContainer.addOrGet(v.getField()); outgoingVector.allocateNew(); - outgoingContainer.add(outgoingVector); ++vectorCount; } @@ -446,6 +430,24 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + RawFragmentBatch batch = getNext(fragProviders[0]); + for (SerializedField field : batch.getHeader().getDef().getFieldList()) { + outgoingContainer.addOrGet(MaterializedField.create(field)); + } + } catch (IOException e) { + throw new SchemaChangeException(e); + } finally { + stats.stopProcessing(); + } + outgoingContainer = VectorContainer.canonicalize(outgoingContainer); + outgoingContainer.buildSchema(SelectionVectorMode.NONE); + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override public int getRecordCount() { return outgoingPosition; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 2c3e85a..9e3cfe5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -67,6 +67,7 @@ public class PartitionSenderRootExec extends BaseRootExec { private final AtomicIntegerArray remainingReceivers; private final AtomicInteger remaingReceiverCount; private volatile boolean done = false; + private boolean first = true; long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; @@ -110,6 +111,22 @@ public class PartitionSenderRootExec extends BaseRootExec { } @Override + public void buildSchema() throws SchemaChangeException { + incoming.buildSchema(); + stats.startProcessing(); + try { + createPartitioner(); + try { + partitioner.flushOutgoingBatches(false, true); + } catch (IOException e) { + throw new SchemaChangeException(e); + } + } finally { + stats.stopProcessing(); + } + } + + @Override public boolean innerNext() { boolean newSchema = false; @@ -128,6 +145,10 @@ public class PartitionSenderRootExec extends BaseRootExec { } logger.debug("Partitioner.next(): got next record batch with status {}", out); + if (first && out == IterOutcome.OK) { + first = false; + out = IterOutcome.OK_NEW_SCHEMA; + } switch(out){ case NONE: try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 338a704..5224f75 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -212,6 +212,7 @@ public abstract class PartitionerTemplate implements Partitioner { private final int oppositeMinorFragmentId; private boolean isLast = false; + private boolean isFirst = true; private volatile boolean terminated = false; private boolean dropAll = false; private BatchSchema outSchema; @@ -289,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner { this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); - if (isLast || terminated) { + if (isFirst || isLast || terminated) { // send final (empty) batch - FragmentWritableBatch writableBatch = new FragmentWritableBatch(true, + FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast || terminated, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), @@ -306,7 +307,12 @@ public abstract class PartitionerTemplate implements Partitioner { } this.sendCount.increment(); vectorContainer.zeroVectors(); - dropAll = true; + if (!isFirst) { + dropAll = true; + } + if (isFirst) { + isFirst = !isFirst; + } return; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 7f3a966..fd2878f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -56,6 +57,30 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } @Override + public IterOutcome buildSchema() throws SchemaChangeException { + stats.startProcessing(); + try { + stats.stopProcessing(); + try { + incoming.buildSchema(); + } finally { + stats.startProcessing(); + } + stats.startSetup(); + try { + for (VectorWrapper w : incoming) { + container.addOrGet(w.getField()); + } + } finally { + stats.stopSetup(); + } + } finally { + stats.stopProcessing(); + } + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override public IterOutcome innerNext() { if (!running) { producer.start(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/63d3008e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 224753e..486fb12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -62,6 +62,7 @@ import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; @@ -79,6 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; + private boolean buildingSchema = true; private static final String EMPTY_STRING = ""; @@ -137,6 +139,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); + container.zeroVectors(); + if (!doAlloc()) { outOfMemory = true; return IterOutcome.OUT_OF_MEMORY; @@ -261,9 +265,25 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } @Override - protected void setupNewSchema() throws SchemaChangeException { + public IterOutcome buildSchema() throws SchemaChangeException { + incoming.buildSchema(); + setupNewSchema(); + return IterOutcome.OK_NEW_SCHEMA; + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { + if (allocationVectors != null) { + for (ValueVector v : allocationVectors) { + v.clear(); + } + } this.allocationVectors = Lists.newArrayList(); - container.clear(); + if (complexWriters != null) { + container.clear(); + } else { + container.zeroVectors(); + } final List<NamedExpression> exprs = getExpressionList(); final ErrorCollector collector = new ErrorCollectorImpl(); final List<TransferPair> transfers = Lists.newArrayList(); @@ -300,9 +320,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { continue; } FieldReference ref = new FieldReference(name); - TransferPair tp = wrapper.getValueVector().getTransferPair(ref); + ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vvIn.getField().getType())); + TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); - container.add(tp.getTo()); } } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors int k = 0; @@ -323,9 +343,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); - ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + ValueVector vv = container.addOrGet(outputField); allocationVectors.add(vv); - TypedFieldId fid = container.add(vv); + TypedFieldId fid = container.getValueVectorId(outputField.getPath()); ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); HoldingContainer hc = cg.addExpr(write); @@ -363,9 +383,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); Preconditions.checkNotNull(incoming); - TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); + FieldReference ref = getRef(namedExpression); + ValueVector vvOut = container.addOrGet(MaterializedField.create(ref, vectorRead.getMajorType())); + TransferPair tp = vvIn.makeTransferPair(vvOut); transfers.add(tp); - container.add(tp.getTo()); transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); logger.debug("Added transfer for project expression."); } else if (expr instanceof DrillFuncHolderExpr && @@ -379,11 +400,16 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); cg.addExpr(expr); + if (buildingSchema) { + buildingSchema = false; + MaterializedField f = MaterializedField.create(outputField.getPath().getAsUnescapedPath(), Types.required(MinorType.MAP)); + container.addOrGet(f); + } } else{ // need to do evaluation. - ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + ValueVector vector = container.addOrGet(outputField); allocationVectors.add(vector); - TypedFieldId fid = container.add(vector); + TypedFieldId fid = container.getValueVectorId(outputField.getPath()); ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); HoldingContainer hc = cg.addExpr(write); @@ -395,7 +421,6 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { cg.rotateBlock(); cg.getEvalBlock()._return(JExpr.TRUE); - container.buildSchema(SelectionVectorMode.NONE); try { this.projector = context.getImplementationClass(cg.getCodeGenerator()); @@ -403,6 +428,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); } + if (container.isSchemaChanged()) { + container.buildSchema(SelectionVectorMode.NONE); + return true; + } else { + return false; + } } private List<NamedExpression> getExpressionList() {
