This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 9d8ac02 DRILL-7439: Batch count fixes for six additional operators 9d8ac02 is described below commit 9d8ac02d05cf6f23ddc80065066722b121577656 Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Tue Nov 5 13:50:56 2019 -0800 DRILL-7439: Batch count fixes for six additional operators Enables vector checks, and fixes batch count and vector issues for: * StreamingAggBatch * RuntimeFilterRecordBatch * FlattenRecordBatch * MergeJoinBatch * NestedLoopJoinBatch * LimitRecordBatch Also fixes a zero-size batch validity issue for the CSV reader when all files contain no data. Includes code cleanup for files touched in this PR. closes #1893 --- .../org/apache/drill/exec/util/AssertionUtil.java | 7 +- .../physical/impl/aggregate/HashAggTemplate.java | 109 +++++------ .../physical/impl/aggregate/StreamingAggBatch.java | 124 +++++++------ .../impl/aggregate/StreamingAggTemplate.java | 43 ++--- .../exec/physical/impl/common/HashPartition.java | 73 +++++--- .../impl/filter/RuntimeFilterRecordBatch.java | 51 +++--- .../physical/impl/flatten/FlattenRecordBatch.java | 201 ++++++++++----------- .../exec/physical/impl/join/MergeJoinBatch.java | 15 +- .../physical/impl/join/NestedLoopJoinBatch.java | 20 +- .../exec/physical/impl/limit/LimitRecordBatch.java | 23 ++- .../impl/protocol/OperatorRecordBatch.java | 4 +- .../drill/exec/physical/impl/scan/ReaderState.java | 4 +- .../exec/physical/impl/scan/ScanOperatorExec.java | 11 +- .../impl/scan/framework/ManagedScanFramework.java | 4 +- .../physical/impl/scan/project/ResolvedTuple.java | 16 +- .../physical/impl/validate/BatchValidator.java | 71 +++++--- .../resultSet/impl/ResultSetLoaderImpl.java | 4 +- .../drill/exec/nested/TestFastComplexSchema.java | 4 +- .../exec/physical/impl/flatten/TestFlatten.java | 11 +- .../main/codegen/templates/FixedValueVectors.java | 64 +++---- .../accessor/writer/OffsetVectorWriterImpl.java | 31 ++-- .../vector/complex/BaseRepeatedValueVector.java | 13 +- 22 files changed, 470 insertions(+), 433 deletions(-) diff --git a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java index f4fc9ab..446825a 100644 --- a/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java +++ b/common/src/main/java/org/apache/drill/exec/util/AssertionUtil.java @@ -18,11 +18,9 @@ package org.apache.drill.exec.util; public class AssertionUtil { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssertionUtil.class); - public static final boolean ASSERT_ENABLED; - static{ + static { boolean isAssertEnabled = false; assert isAssertEnabled = true; ASSERT_ENABLED = isAssertEnabled; @@ -32,6 +30,5 @@ public class AssertionUtil { return ASSERT_ENABLED; } - private AssertionUtil() { - } + private AssertionUtil() { } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 6cec50e..d166353 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.physical.impl.aggregate; +import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK; +import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -31,7 +34,6 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; - import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.VectorSerializer.Writer; import org.apache.drill.exec.compile.sig.RuntimeOverridden; @@ -40,7 +42,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.TypeHelper; - import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -55,40 +56,30 @@ import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.IndexPointer; - import org.apache.drill.exec.physical.impl.common.SpilledState; -import org.apache.drill.exec.record.RecordBatchSizer; - import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.planner.physical.AggPrelBase; - +import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; - import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.BatchSchema; - -import org.apache.drill.exec.record.VectorContainer; - -import org.apache.drill.exec.record.TypedFieldId; - import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; - import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; - import org.apache.drill.exec.vector.VariableWidthVector; - -import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_MASK; -import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_ROW_COUNT; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class HashAggTemplate implements HashAggregator { - protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); + protected static final Logger logger = LoggerFactory.getLogger(HashAggregator.class); private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; @@ -98,40 +89,40 @@ public abstract class HashAggTemplate implements HashAggregator { private static final boolean EXTRA_DEBUG_SPILL = false; // Fields needed for partitioning (the groups into partitions) - private int nextPartitionToReturn = 0; // which partition to return the next batch from + private int nextPartitionToReturn; // which partition to return the next batch from // The following members are used for logging, metrics, etc. - private int rowsInPartition = 0; // counts #rows in each partition - private int rowsNotSpilled = 0; - private int rowsSpilled = 0; - private int rowsSpilledReturned = 0; - private int rowsReturnedEarly = 0; + private int rowsInPartition; // counts #rows in each partition + private int rowsNotSpilled; + private int rowsSpilled; + private int rowsSpilledReturned; + private int rowsReturnedEarly; private AggPrelBase.OperatorPhase phase; private boolean canSpill = true; // make it false in case can not spill/return-early private ChainedHashTable baseHashTable; - private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory - private int earlyPartition = 0; // which partition to return early - private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry - private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill - private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill - private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns) - private long estValuesRowWidth = 0; // the size of the internal values ( values + extra ) - private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns) - private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM - private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM + private boolean earlyOutput; // when 1st phase returns a partition due to no memory + private int earlyPartition; // which partition to return early + private boolean retrySameIndex; // in case put failed during 1st phase - need to output early, then retry + private boolean useMemoryPrediction; // whether to use memory prediction to decide when to spill + private long estMaxBatchSize; // used for adjusting #partitions and deciding when to spill + private long estRowWidth; // the size of the internal "row" (keys + values + extra columns) + private long estValuesRowWidth; // the size of the internal values ( values + extra ) + private long estOutputRowWidth; // the size of the output "row" (no extra columns) + private long estValuesBatchSize; // used for "reserving" memory for the Values batch to overcome an OOM + private long estOutgoingAllocSize; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars private long minBatchesPerPartition; // for tuning - num partitions and spill decision - private long plannedBatches = 0; // account for planned, but not yet allocated batches + private long plannedBatches; // account for planned, but not yet allocated batches - private int underlyingIndex = 0; - private int currentIndex = 0; + private int underlyingIndex; + private int currentIndex; private IterOutcome outcome; - private int numGroupedRecords = 0; - private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() + private int numGroupedRecords; + private int currentBatchRecordCount; // Performance: Avoid repeated calls to getRecordCount() - private int lastBatchOutputCount = 0; + private int lastBatchOutputCount; private RecordBatch incoming; private BatchSchema schema; private HashAggBatch outgoing; @@ -148,7 +139,7 @@ public abstract class HashAggTemplate implements HashAggregator { // For handling spilling private HashAggUpdater updater; - private SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>(); + private final SpilledState<HashAggSpilledPartition> spilledState = new SpilledState<>(); private SpillSet spillSet; SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming" private Writer writers[]; // a vector writer for each spilled partition @@ -157,17 +148,17 @@ public abstract class HashAggTemplate implements HashAggregator { private int originalPartition = -1; // the partition a secondary reads from private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put() - private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields + private int numGroupByOutFields; // Note: this should be <= number of group-by fields private TypedFieldId[] groupByOutFieldIds; private MaterializedField[] materializedValueFields; - private boolean allFlushed = false; - private boolean buildComplete = false; - private boolean handlingSpills = false; // True once starting to process spill files - private boolean handleEmit = false; // true after receiving an EMIT, till finish handling it + private boolean allFlushed; + private boolean buildComplete; + private boolean handlingSpills; // True once starting to process spill files + private boolean handleEmit; // true after receiving an EMIT, till finish handling it - private OperatorStats stats = null; - private HashTableStats htStats = new HashTableStats(); + private OperatorStats stats; + private final HashTableStats htStats = new HashTableStats(); public enum Metric implements MetricDef { @@ -198,9 +189,9 @@ public abstract class HashAggTemplate implements HashAggregator { } public class BatchHolder { - private VectorContainer aggrValuesContainer; // container for aggr values (workspace variables) + private final VectorContainer aggrValuesContainer; // container for aggr values (workspace variables) private int maxOccupiedIdx = -1; - private int targetBatchRowCount = 0; + private int targetBatchRowCount; public int getTargetBatchRowCount() { return targetBatchRowCount; @@ -1009,11 +1000,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, numOutputRecords); // set the value count for outgoing batch value vectors - for (VectorWrapper<?> v : outgoing) { - v.getValueVector().getMutator().setValueCount(numOutputRecords); - } - - outContainer.setRecordCount(numOutputRecords); + outContainer.setValueCount(numOutputRecords); WritableBatch batch = WritableBatch.getBatchNoHVWrap(numOutputRecords, outContainer, false); try { writers[part].write(batch, null); @@ -1067,10 +1054,8 @@ public abstract class HashAggTemplate implements HashAggregator { if ( handleEmit && ( batchHolders == null || batchHolders[0].size() == 0 ) ) { lastBatchOutputCount = 0; // empty allocateOutgoing(0); - for (VectorWrapper<?> v : outgoing) { - v.getValueVector().getMutator().setValueCount(0); - } - outgoing.getContainer().setRecordCount(0); + outgoing.getContainer().setValueCount(0); + // When returning the last outgoing batch (following an incoming EMIT), then replace OK with EMIT this.outcome = IterOutcome.EMIT; handleEmit = false; // finish handling EMIT @@ -1184,9 +1169,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, numPendingOutput); // set the value count for outgoing batch value vectors - for (VectorWrapper<?> v : outgoing) { - v.getValueVector().getMutator().setValueCount(numOutputRecords); - } + outgoing.getContainer().setValueCount(numOutputRecords); outgoing.getRecordBatchMemoryManager().updateOutgoingStats(numOutputRecords); RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, outgoing, outgoing.getRecordBatchStatsContext()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 471904f..7d02c99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -17,11 +17,15 @@ */ package org.apache.drill.exec.physical.impl.aggregate; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; + import java.io.IOException; import java.util.List; -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -63,24 +67,22 @@ import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.UntypedNullHolder; import org.apache.drill.exec.vector.UntypedNullVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; -import org.apache.drill.exec.vector.complex.writer.BaseWriter; - -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class); + static final Logger logger = LoggerFactory.getLogger(StreamingAggBatch.class); protected StreamingAggregator aggregator; protected final RecordBatch incoming; private List<BaseWriter.ComplexWriter> complexWriters; - // + // Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving // lateral and unnest. In case(a), the aggregator proceeds normally until it sees a group change or a NONE. If a // group has changed, the aggregated data is sent downstream and the aggregation continues with the next group. If @@ -100,33 +102,37 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { // Schema change within a Data Set is not supported. // // We will define some states for internal management - // - private boolean done = false; // END of all data + + private boolean done; // END of all data private boolean first = true; // Beginning of new data set. True during the build schema phase. False once the first // call to inner next is made. - private boolean sendEmit = false; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set - // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration, - // we send out an empty batch with EMIT. + private boolean sendEmit; // In the case where we see an OK_NEW_SCHEMA along with the end of a data set + // we send out a batch with OK_NEW_SCHEMA first, then in the next iteration, + // we send out an empty batch with EMIT. private IterOutcome lastKnownOutcome = OK; // keep track of the outcome from the previous call to incoming.next private boolean firstBatchForSchema = true; // true if the current batch came in with an OK_NEW_SCHEMA private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set - private int recordCount = 0; // number of records output in the current data set + private int recordCount; // number of records output in the current data set private BatchSchema incomingSchema; /* - * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause we need to perform special handling when - * the incoming batch is empty. In the case of the empty input into the streaming aggregate we need - * to return a single batch with one row. For count we need to return 0 and for all other aggregate - * functions like sum, avg etc we need to return an explicit row with NULL. Since we correctly allocate the type of - * the outgoing vectors (required for count and nullable for other aggregate functions) all we really need to do - * is simply set the record count to be 1 in such cases. For nullable vectors we don't need to do anything because - * if we don't set anything the output will be NULL, however for required vectors we explicitly zero out the vector - * since we don't zero it out while allocating it. + * DRILL-2277, DRILL-2411: For straight aggregates without a group by clause + * we need to perform special handling when the incoming batch is empty. In + * the case of the empty input into the streaming aggregate we need to return + * a single batch with one row. For count we need to return 0 and for all + * other aggregate functions like sum, avg etc we need to return an explicit + * row with NULL. Since we correctly allocate the type of the outgoing vectors + * (required for count and nullable for other aggregate functions) all we + * really need to do is simply set the record count to be 1 in such cases. For + * nullable vectors we don't need to do anything because if we don't set + * anything the output will be NULL, however for required vectors we + * explicitly zero out the vector since we don't zero it out while allocating + * it. * * We maintain some state to remember that we have done such special handling. */ - private boolean specialBatchSent = false; + private boolean specialBatchSent; private static final int SPECIAL_BATCH_COUNT = 1; // TODO: Needs to adapt to batch sizing rather than hardcoded constant value @@ -156,7 +162,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { @Override public VectorContainer getOutgoingContainer() { - return this.container; + return container; } @Override @@ -177,17 +183,18 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { break; } - this.incomingSchema = incoming.getSchema(); + incomingSchema = incoming.getSchema(); if (!createAggregator()) { state = BatchState.DONE; } - for (final VectorWrapper<?> w : container) { + for (VectorWrapper<?> w : container) { w.getValueVector().allocateNew(); } if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } + container.setRecordCount(0); } @Override @@ -207,6 +214,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { firstBatchForDataSet = true; firstBatchForSchema = false; recordCount = 0; + container.setEmpty(); specialBatchSent = false; return EMIT; } @@ -379,7 +387,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private void allocateComplexWriters() { // Allocate the complex writers before processing the incoming batch if (complexWriters != null) { - for (final BaseWriter.ComplexWriter writer : complexWriters) { + for (BaseWriter.ComplexWriter writer : complexWriters) { writer.allocate(); } } @@ -393,8 +401,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { */ private void constructSpecialBatch() { int exprIndex = 0; - for (final VectorWrapper<?> vw: container) { - final ValueVector vv = vw.getValueVector(); + for (VectorWrapper<?> vw: container) { + ValueVector vv = vw.getValueVector(); AllocationHelper.allocateNew(vv, SPECIAL_BATCH_COUNT); vv.getMutator().setValueCount(SPECIAL_BATCH_COUNT); if (vv.getField().getType().getMode() == TypeProtos.DataMode.REQUIRED) { @@ -442,7 +450,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } - public void addComplexWriter(final BaseWriter.ComplexWriter writer) { + public void addComplexWriter(BaseWriter.ComplexWriter writer) { complexWriters.add(writer); } @@ -460,21 +468,21 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { ErrorCollector collector = new ErrorCollectorImpl(); for (int i = 0; i < keyExprs.length; i++) { - final NamedExpression ne = popConfig.getKeys().get(i); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() ); + NamedExpression ne = popConfig.getKeys().get(i); + LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector,context.getFunctionRegistry() ); if (expr == null) { continue; } keyExprs[i] = expr; - final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), + MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), expr.getMajorType()); - final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); keyOutputIds[i] = container.add(vector); } for (int i = 0; i < valueExprs.length; i++) { - final NamedExpression ne = popConfig.getExprs().get(i); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + NamedExpression ne = popConfig.getExprs().get(i); + LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); if (expr instanceof IfExpression) { throw UserException.unsupportedError(new UnsupportedOperationException("Union type not supported in aggregate functions")).build(logger); } @@ -498,7 +506,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { container.add(new UntypedNullVector(field, container.getAllocator())); valueExprs[i] = expr; } else { - final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), + MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(), expr.getMajorType()); ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); TypedFieldId id = container.add(vector); @@ -532,40 +540,41 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { protected void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(IS_SAME_I1); - for (final LogicalExpression expr : keyExprs) { + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(IS_SAME_I1); - final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); cg.setMappingSet(IS_SAME_I2); - final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); - final LogicalExpression fh = + LogicalExpression fh = FunctionGenerationHelper .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry()); - final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); } - private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); // the internal batch changes each time so we need to redo setup. + // the internal batch changes each time so we need to redo setup. + private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSamePrev", "isSamePrev", null, null); private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSamePrev", null, null); private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); protected void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(ISA_B1); - for (final LogicalExpression expr : keyExprs) { + for (LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. cg.setMappingSet(ISA_B1); - final HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer first = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); cg.setMappingSet(ISA_B2); - final HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer second = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); - final LogicalExpression fh = + LogicalExpression fh = FunctionGenerationHelper .getOrderingComparatorNullsHigh(first, second, context.getFunctionRegistry()); - final HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); } cg.getEvalBlock()._return(JExpr.TRUE); @@ -577,7 +586,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { protected void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) { cg.setMappingSet(EVAL); - for (final LogicalExpression ex : valueExprs) { + for (LogicalExpression ex : valueExprs) { cg.addExpr(ex); } } @@ -601,11 +610,13 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { cg.setMappingSet(RECORD_KEYS_PREV); for (int i = 0; i < keyExprs.length; i++) { - // IMPORTANT: there is an implicit assertion here that the TypedFieldIds for the previous batch and the current batch are the same. This is possible because InternalBatch guarantees this. + // IMPORTANT: there is an implicit assertion here that the TypedFieldIds + // for the previous batch and the current batch are the same. This is + // possible because InternalBatch guarantees this. logger.debug("Writing out expr {}", keyExprs[i]); cg.rotateBlock(); cg.setMappingSet(RECORD_KEYS_PREV); - final HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE); + HoldingContainer innerExpression = cg.addExpr(keyExprs[i], ClassGenerator.BlkCreateMode.FALSE); cg.setMappingSet(RECORD_KEYS_PREV_OUT); cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], new HoldingContainerExpression(innerExpression), true), ClassGenerator.BlkCreateMode.FALSE); } @@ -655,11 +666,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override - public void close() { - super.close(); - } - - @Override protected void killIncoming(boolean sendUpstream) { incoming.kill(sendUpstream); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index cc89f23..173ed6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -17,6 +17,11 @@ */ package org.apache.drill.exec.physical.impl.aggregate; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; @@ -25,32 +30,29 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; - -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK; -import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class StreamingAggTemplate implements StreamingAggregator { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregator.class); + private static final Logger logger = LoggerFactory.getLogger(StreamingAggregator.class); private static final boolean EXTRA_DEBUG = false; private int maxOutputRows = ValueVector.MAX_ROW_COUNT; // lastOutcome is set ONLY if the lastOutcome was NONE or STOP - private IterOutcome lastOutcome = null; + private IterOutcome lastOutcome; // First batch after build schema phase private boolean first = true; - private boolean firstBatchForSchema = false; // true if the current batch came in with an OK_NEW_SCHEMA. + private boolean firstBatchForSchema; // true if the current batch came in with an OK_NEW_SCHEMA. private boolean firstBatchForDataSet = true; // true if the current batch is the first batch in a data set - private boolean newSchema = false; + private boolean newSchema; // End of all data - private boolean done = false; + private boolean done; // index in the incoming (sv4/sv2/vector) - private int underlyingIndex = 0; + private int underlyingIndex; // The indexes below refer to the actual record indexes in input batch // (i.e if a selection vector the sv4/sv2 entry has been dereferenced or if a vector then the record index itself) private int previousIndex = -1; // the last index that has been processed. Initialized to -1 every time a new @@ -59,19 +61,18 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { /** * Number of records added to the current aggregation group. */ - private long addedRecordCount = 0; + private long addedRecordCount; // There are two outcomes from the aggregator. One is the aggregator's outcome defined in // StreamingAggregator.AggOutcome. The other is the outcome from the last call to incoming.next private IterOutcome outcome; // Number of aggregation groups added into the output batch - private int outputCount = 0; + private int outputCount; private RecordBatch incoming; // the Streaming Agg Batch that this aggregator belongs to private StreamingAggBatch outgoing; private OperatorContext context; - @Override public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing, int outputRowCount) throws SchemaChangeException { @@ -166,7 +167,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } } - if (newSchema) { return AggOutcome.UPDATE_AGGREGATOR; } @@ -350,7 +350,6 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { first = false; } } - } @Override @@ -432,11 +431,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } else { outcomeToReturn = OK; } - this.outcome = outcomeToReturn; + outcome = outcomeToReturn; - for (VectorWrapper<?> v : outgoing) { - v.getValueVector().getMutator().setValueCount(outputCount); - } + outgoing.getContainer().setValueCount(outputCount); return (seenOutcome == EMIT) ? AggOutcome.RETURN_AND_RESET : AggOutcome.RETURN_OUTCOME; } @@ -455,11 +452,9 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } else { outcomeToReturn = EMIT; } - this.outcome = outcomeToReturn; + outcome = outcomeToReturn; - for (VectorWrapper<?> v : outgoing) { - v.getValueVector().getMutator().setValueCount(outputCount); - } + outgoing.getContainer().setValueCount(outputCount); return AggOutcome.RETURN_AND_RESET; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java index be3c51e..0257abc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java @@ -17,12 +17,20 @@ */ package org.apache.drill.exec.physical.impl.common; -import com.carrotsearch.hppc.IntArrayList; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.exceptions.RetryAfterSpillException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.cache.VectorSerializer; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; @@ -47,16 +55,12 @@ import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; -import org.apache.drill.common.types.TypeProtos.DataMode; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.TimeUnit; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE; +import com.carrotsearch.hppc.IntArrayList; /** * <h2>Overview</h2> @@ -72,14 +76,14 @@ import static org.apache.drill.exec.physical.impl.common.HashTable.BATCH_SIZE; * </p> */ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartition.class); + static final Logger logger = LoggerFactory.getLogger(HashPartition.class); public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$"; private int partitionNum = -1; // the current number of this partition, as used by the operator private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; - private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars + private final int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars public static final MajorType HVtype = MajorType.newBuilder() .setMinorType(MinorType.INT /* dataType */ ) @@ -93,7 +97,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { // While build data is incoming - temporarily keep the list of in-memory // incoming batches, per each partition (these may be spilled at some point) - private List<VectorContainer> tmpBatchesList; + private final List<VectorContainer> tmpBatchesList; // A batch and HV vector to hold incoming rows - per each partition private VectorContainer currentBatch; // The current (newest) batch private IntVector currHVVector; // The HV vectors for the currentBatches @@ -112,21 +116,21 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { private int partitionBatchesCount; // count number of batches spilled private String spillFile; - private BufferAllocator allocator; + private final BufferAllocator allocator; private int recordsPerBatch; - private SpillSet spillSet; + private final SpillSet spillSet; private boolean isSpilled; // is this partition spilled ? private boolean processingOuter; // is (inner done spilling and) now the outer is processed? private boolean outerBatchAllocNotNeeded; // when the inner is whole in memory - private RecordBatch buildBatch; - private RecordBatch probeBatch; - private int cycleNum; - private int numPartitions; - private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList(); + private final RecordBatch buildBatch; + private final RecordBatch probeBatch; + private final int cycleNum; + private final int numPartitions; + private final List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList(); private long partitionInMemorySize; private long numInMemoryRecords; - private boolean updatedRecordsPerBatch = false; - private boolean semiJoin; + private boolean updatedRecordsPerBatch; + private final boolean semiJoin; public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable, RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin, @@ -188,7 +192,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { try { while (vci.hasNext()) { - VectorWrapper vw = vci.next(); + VectorWrapper<?> vw = vci.next(); // If processing a spilled container, skip the last column (HV) if ( cycleNum > 0 && ! vci.hasNext() ) { break; } ValueVector vv = vw.getValueVector(); @@ -254,9 +258,11 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public void completeAnOuterBatch(boolean toInitialize) { completeABatch(toInitialize, true); } + public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) { completeABatch(toInitialize, needsSpill); } + /** * A current batch is full (or no more rows incoming) - complete processing this batch * I.e., add it to its partition's tmp list, if needed - spill that list, and if needed - @@ -349,15 +355,13 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { numInMemoryRecords = 0L; inMemoryBatchStats.clear(); - while ( tmpBatchesList.size() > 0 ) { + while (tmpBatchesList.size() > 0) { VectorContainer vc = tmpBatchesList.remove(0); int numRecords = vc.getRecordCount(); // set the value count for outgoing batch value vectors - for (VectorWrapper<?> v : vc) { - v.getValueVector().getMutator().setValueCount(numRecords); - } + vc.setValueCount(numRecords); WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false); try { @@ -381,6 +385,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException { return hashTable.probeForKey(recordsProcessed, hashCode); } + public Pair<Integer, Boolean> getStartIndex(int probeIndex) { /* The current probe record has a key that matches. Get the index * of the first row in the build side that matches the current key @@ -393,16 +398,20 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { boolean matchExists = hjHelper.setRecordMatched(compositeIndex); return Pair.of(compositeIndex, matchExists); } + public int getNextIndex(int compositeIndex) { // in case of inner rows with duplicate keys, get the next one return hjHelper.getNextIndex(compositeIndex); } + public boolean setRecordMatched(int compositeIndex) { return hjHelper.setRecordMatched(compositeIndex); } + public IntArrayList getNextUnmatchedIndex() { return hjHelper.getNextUnmatchedIndex(); } + // // ===================================================================================== // @@ -410,9 +419,11 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public int getBuildHashCode(int ind) throws SchemaChangeException { return hashTable.getBuildHashCode(ind); } + public int getProbeHashCode(int ind) throws SchemaChangeException { return hashTable.getProbeHashCode(ind); } + public ArrayList<VectorContainer> getContainers() { return containers; } @@ -457,6 +468,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { public int getPartitionBatchesCount() { return partitionBatchesCount; } + public int getPartitionNum() { return partitionNum; } @@ -468,6 +480,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { closeWriterInternal(false); processingOuter = true; // After the spill file was closed } + /** * If exists - close the writer for this partition * @@ -601,4 +614,4 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat { return String.format("[hashTable = %s]", hashTable == null ? "None": hashTable.makeDebugString()); } -} // class HashPartition +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index ac6718c..28de51f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -17,6 +17,13 @@ */ package org.apache.drill.exec.physical.impl.filter; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.LogicalExpression; @@ -39,41 +46,39 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.work.filter.BloomFilter; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch participates - * in the HashJoinBatch and can be applied by a RuntimeFilter, it will generate a filtered - * SV2, otherwise will generate a same recordCount-originalRecordCount SV2 which will not affect - * the Query's performance ,but just do a memory transfer by the later RemovingRecordBatch op. + * A RuntimeFilterRecordBatch steps over the ScanBatch. If the ScanBatch + * participates in the HashJoinBatch and can be applied by a RuntimeFilter, it + * will generate a filtered SV2, otherwise will generate a same + * recordCount-originalRecordCount SV2 which will not affect the Query's + * performance ,but just do a memory transfer by the later RemovingRecordBatch + * op. */ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeFilterPOP> { - private SelectionVector2 sv2; + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + private SelectionVector2 sv2; private ValueVectorHashHelper.Hash64 hash64; - private Map<String, Integer> field2id = new HashMap<>(); + private final Map<String, Integer> field2id = new HashMap<>(); private List<String> toFilterFields; private List<BloomFilter> bloomFilters; private RuntimeFilterWritable current; private int originalRecordCount; - private long filteredRows = 0l; - private long appliedTimes = 0l; - private int batchTimes = 0; - private boolean waited = false; - private boolean enableRFWaiting; - private long maxWaitingTime; - private long rfIdentifier; - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); + private long filteredRows; + private long appliedTimes; + private int batchTimes; + private boolean waited; + private final boolean enableRFWaiting; + private final long maxWaitingTime; + private final long rfIdentifier; public RuntimeFilterRecordBatch(RuntimeFilterPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); - enableRFWaiting = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY).bool_val; - maxWaitingTime = context.getOptions().getOption(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY).num_val; + enableRFWaiting = context.getOptions().getBoolean(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY); + maxWaitingTime = context.getOptions().getLong(ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY); this.rfIdentifier = pop.getIdentifier(); } @@ -107,6 +112,7 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF throw new UnsupportedOperationException(e); } container.transferIn(incoming.getContainer()); + container.setRecordCount(originalRecordCount); updateStats(); return getFinalOutcome(false); } @@ -258,7 +264,6 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF } } - appliedTimes++; sv2.setRecordCount(svIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 7877c6b..cee7625 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -43,16 +43,16 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.FlattenPOP; -import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchMemoryManager; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.ValueVector; @@ -60,20 +60,23 @@ import org.apache.drill.exec.vector.complex.AbstractRepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedMapVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.carrotsearch.hppc.IntHashSet; import com.sun.codemodel.JExpr; -// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly -// as they come in +// TODO - handle the case where a user tries to flatten a scalar, should +// just act as a project all of the columns exactly as they come in + public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class); + private static final Logger logger = LoggerFactory.getLogger(FlattenRecordBatch.class); private Flattener flattener; private List<ValueVector> allocationVectors; private List<ComplexWriter> complexWriters; - private boolean hasRemainder = false; - private int remainderIndex = 0; + private boolean hasRemainder; + private int remainderIndex; private int recordCount; private final FlattenMemoryManager flattenMemoryManager; @@ -81,7 +84,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override public int getBufferSizeFor(int recordCount) { int bufferSize = 0; - for(final ValueVector vv : allocationVectors) { + for (ValueVector vv : allocationVectors) { bufferSize += vv.getBufferSizeFor(recordCount); } return bufferSize; @@ -98,7 +101,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { outputNames.clear(); } - // note: don't clear the internal maps since they have cumulative data.. + // note: don't clear the internal maps since they have cumulative data. } } @@ -129,28 +132,28 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // Get sizing information for the batch. setRecordBatchSizer(new RecordBatchSizer(incoming)); - final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); - final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); // Get column size of flatten column. RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName()); // Average rowWidth of flatten column - final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry(); + int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry(); // Average rowWidth excluding the flatten column. - final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn; + int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().getNetRowWidth() - avgRowWidthFlattenColumn; // Average rowWidth of single element in the flatten list. // subtract the offset vector size from column data size. - final int avgRowWidthSingleFlattenEntry = + int avgRowWidthSingleFlattenEntry = RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()), columnSize.getElementCount()); // Average rowWidth of outgoing batch. - final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; + int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; - final int outputBatchSize = getOutputBatchSize(); + int outputBatchSize = getOutputBatchSize(); // Number of rows in outgoing batch setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); @@ -182,14 +185,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { return recordCount; } - @Override protected void killIncoming(boolean sendUpstream) { super.killIncoming(sendUpstream); hasRemainder = false; } - @Override public IterOutcome innerNext() { if (hasRemainder) { @@ -202,14 +203,14 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override public VectorContainer getOutgoingContainer() { - return this.container; + return container; } private void setFlattenVector() { - final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); - final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); - final RepeatedValueVector vector; - final ValueVector inVV = incoming.getValueAccessorById( + TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); + MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); + RepeatedValueVector vector; + ValueVector inVV = incoming.getValueAccessorById( field.getValueClass(), typedFieldId.getFieldIds()).getValueVector(); if (! (inVV instanceof RepeatedValueVector)) { @@ -232,10 +233,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { int incomingRecordCount = incoming.getRecordCount(); - if (!doAlloc(flattenMemoryManager.getOutputRowCount())) { - outOfMemory = true; - return IterOutcome.OUT_OF_MEMORY; - } + doAlloc(flattenMemoryManager.getOutputRowCount()); // we call this in setupSchema, but we also need to call it here so we have a reference to the appropriate vector // inside of the the flattener for the current batch @@ -244,18 +242,13 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount(); int outputRecords = childCount == 0 ? 0: flattener.flattenRecords(incomingRecordCount, 0, monitor); // TODO - change this to be based on the repeated vector length + setValueCount(outputRecords); if (outputRecords < childCount) { - setValueCount(outputRecords); hasRemainder = true; remainderIndex = outputRecords; - this.recordCount = remainderIndex; } else { - setValueCount(outputRecords); flattener.resetGroupIndex(); - for(VectorWrapper<?> v: incoming) { - v.clear(); - } - this.recordCount = outputRecords; + VectorAccessibleUtilities.clear(incoming.getContainer()); } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. @@ -276,25 +269,18 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // remainingRecordCount can be much higher than number of rows we will have in outgoing batch. // Do memory allocation only for number of rows we are going to have in the batch. - if (!doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount()))) { - outOfMemory = true; - return; - } + doAlloc(Math.min(remainingRecordCount, flattenMemoryManager.getOutputRowCount())); int projRecords = flattener.flattenRecords(remainingRecordCount, 0, monitor); if (projRecords < remainingRecordCount) { setValueCount(projRecords); - this.recordCount = projRecords; remainderIndex += projRecords; } else { setValueCount(remainingRecordCount); hasRemainder = false; remainderIndex = 0; - for (VectorWrapper<?> v : incoming) { - v.clear(); - } + VectorAccessibleUtilities.clear(incoming.getContainer()); flattener.resetGroupIndex(); - this.recordCount = remainingRecordCount; } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. @@ -303,45 +289,38 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } flattenMemoryManager.updateOutgoingStats(projRecords); - } public void addComplexWriter(ComplexWriter writer) { complexWriters.add(writer); } - private boolean doAlloc(int recordCount) { - - for (ValueVector v : this.allocationVectors) { + private void doAlloc(int recordCount) { + for (ValueVector v : allocationVectors) { // This will iteratively allocate memory for nested columns underneath. RecordBatchSizer.ColumnSize colSize = flattenMemoryManager.getColumnSize(v.getField().getName()); colSize.allocateVector(v, recordCount); } - //Allocate vv for complexWriters. - if (complexWriters == null) { - return true; - } - - for (ComplexWriter writer : complexWriters) { - writer.allocate(); + // Allocate vv for complexWriters. + if (complexWriters != null) { + for (ComplexWriter writer : complexWriters) { + writer.allocate(); + } } - - return true; } private void setValueCount(int count) { - for (ValueVector v : allocationVectors) { - ValueVector.Mutator m = v.getMutator(); - m.setValueCount(count); - } - - if (complexWriters == null) { - return; + recordCount = count; + if (count == 0) { + container.setEmpty(); + } else { + container.setValueCount(count); } - - for (ComplexWriter writer : complexWriters) { - writer.setValueCount(count); + if (complexWriters != null) { + for (ComplexWriter writer : complexWriters) { + writer.setValueCount(count); + } } } @@ -350,33 +329,40 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } /** - * The data layout is the same for the actual data within a repeated field, as it is in a scalar vector for - * the same sql type. For example, a repeated int vector has a vector of offsets into a regular int vector to - * represent the lists. As the data layout for the actual values in the same in the repeated vector as in the - * scalar vector of the same type, we can avoid making individual copies for the column being flattened, and just - * use vector copies between the inner vector of the repeated field to the resulting scalar vector from the flatten - * operation. This is completed after we determine how many records will fit (as we will hit either a batch end, or - * the end of one of the other vectors while we are copying the data of the other vectors alongside each new flattened - * value coming out of the repeated field.) + * The data layout is the same for the actual data within a repeated field, as + * it is in a scalar vector for the same sql type. For example, a repeated int + * vector has a vector of offsets into a regular int vector to represent the + * lists. As the data layout for the actual values in the same in the repeated + * vector as in the scalar vector of the same type, we can avoid making + * individual copies for the column being flattened, and just use vector + * copies between the inner vector of the repeated field to the resulting + * scalar vector from the flatten operation. This is completed after we + * determine how many records will fit (as we will hit either a batch end, or + * the end of one of the other vectors while we are copying the data of the + * other vectors alongside each new flattened value coming out of the repeated + * field.) */ private TransferPair getFlattenFieldTransferPair(FieldReference reference) { - final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn()); - final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass(); - final ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector(); + TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn()); + Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass(); + ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector(); TransferPair tp = null; if (flattenField instanceof AbstractRepeatedMapVector) { - tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap(reference.getAsNamePart().getName(), oContext.getAllocator()); - } else if ( !(flattenField instanceof RepeatedValueVector) ) { + tp = ((AbstractRepeatedMapVector) flattenField).getTransferPairToSingleMap( + reference.getAsNamePart().getName(), oContext.getAllocator()); + } else if (!(flattenField instanceof RepeatedValueVector)) { if(incoming.getRecordCount() != 0) { - throw UserException.unsupportedError().message("Flatten does not support inputs of non-list values.").build(logger); + throw UserException.unsupportedError().message( + "Flatten does not support inputs of non-list values.").build(logger); } logger.error("Cannot cast {} to RepeatedValueVector", flattenField); //when incoming recordCount is 0, don't throw exception since the type being seen here is not solid - final ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null); - tp = RepeatedValueVector.class.cast(vv).getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator()); + ValueVector vv = new RepeatedMapVector(flattenField.getField(), oContext.getAllocator(), null); + tp = RepeatedValueVector.class.cast(vv).getTransferPair( + reference.getAsNamePart().getName(), oContext.getAllocator()); } else { - final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); + ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); // vvIn may be null because of fast schema return for repeated list vectors if (vvIn != null) { tp = vvIn.getTransferPair(reference.getAsNamePart().getName(), oContext.getAllocator()); @@ -387,28 +373,32 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override protected boolean setupNewSchema() throws SchemaChangeException { - this.allocationVectors = new ArrayList<>(); + allocationVectors = new ArrayList<>(); container.clear(); - final List<NamedExpression> exprs = getExpressionList(); - final ErrorCollector collector = new ErrorCollectorImpl(); - final List<TransferPair> transfers = new ArrayList<>(); + List<NamedExpression> exprs = getExpressionList(); + ErrorCollector collector = new ErrorCollectorImpl(); + List<TransferPair> transfers = new ArrayList<>(); - final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions()); + ClassGenerator<Flattener> cg = CodeGenerator.getRoot( + Flattener.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - final IntHashSet transferFieldIds = new IntHashSet(); + IntHashSet transferFieldIds = new IntHashSet(); - final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); - final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true); - final FieldReference fieldReference = flattenExpr.getRef(); - final TransferPair transferPair = getFlattenFieldTransferPair(fieldReference); + NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), + new FieldReference(popConfig.getColumn())); + ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize( + flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + FieldReference fieldReference = flattenExpr.getRef(); + TransferPair transferPair = getFlattenFieldTransferPair(fieldReference); if (transferPair != null) { - final ValueVector flattenVector = transferPair.getTo(); + ValueVector flattenVector = transferPair.getTo(); // checks that list has only default ValueVector and replaces resulting ValueVector to INT typed ValueVector if (exprs.size() == 0 && flattenVector.getField().getType().equals(Types.LATE_BIND_TYPE)) { - final MaterializedField outputField = MaterializedField.create(fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT); - final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + MaterializedField outputField = MaterializedField.create( + fieldReference.getAsNamePart().getName(), Types.OPTIONAL_INT); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); container.add(vector); } else { @@ -435,9 +425,11 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } } - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), + incoming, collector, context.getFunctionRegistry(), true); if (collector.hasErrors()) { - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); + throw new SchemaChangeException(String.format( + "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } if (expr instanceof DrillFuncHolderExpr && ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) { @@ -452,10 +444,11 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { cg.addExpr(expr); } else { // need to do evaluation. - final MaterializedField outputField; + MaterializedField outputField; if (expr instanceof ValueVectorReadExpression) { - final TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId(); - final ValueVector incomingVector = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); + TypedFieldId id = ValueVectorReadExpression.class.cast(expr).getFieldId(); + ValueVector incomingVector = incoming.getValueAccessorById( + id.getIntermediateClass(), id.getFieldIds()).getValueVector(); // outputField is taken from the incoming schema to avoid the loss of nested fields // when the first batch will be empty. if (incomingVector != null) { @@ -466,7 +459,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { } else { outputField = MaterializedField.create(outputName, expr.getMajorType()); } - final ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocationVectors.add(vector); TypedFieldId fid = container.add(vector); ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); @@ -482,7 +475,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { container.buildSchema(SelectionVectorMode.NONE); try { - this.flattener = context.getImplementationClass(cg.getCodeGenerator()); + flattener = context.getImplementationClass(cg.getCodeGenerator()); flattener.setup(context, incoming, this, transfers); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException("Failure while attempting to load generated class", e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 7c59b4d..ef477b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -17,8 +17,10 @@ */ package org.apache.drill.exec.physical.impl.join; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.sun.codemodel.JClass; import com.sun.codemodel.JConditional; import com.sun.codemodel.JExpr; @@ -74,7 +76,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; */ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class); + private static final Logger logger = LoggerFactory.getLogger(MergeJoinBatch.class); private final MappingSet setupMapping = new MappingSet("null", "null", @@ -177,6 +179,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { } allocateBatch(true); + container.setEmpty(); } @Override @@ -269,11 +272,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { } private void setRecordCountInContainer() { - for (VectorWrapper vw : container) { - Preconditions.checkArgument(!vw.isHyper()); - vw.getValueVector().getMutator().setValueCount(getRecordCount()); - } - + container.setValueCount(getRecordCount()); RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); batchMemoryManager.updateOutgoingStats(getRecordCount()); } @@ -487,7 +486,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { // Allocate memory for the vectors. // This will iteratively allocate memory for all nested columns underneath. int outputRowCount = batchMemoryManager.getOutputRowCount(); - for (VectorWrapper w : container) { + for (VectorWrapper<?> w : container) { RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), outputRowCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 7513ebd..6b7edd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -23,6 +23,8 @@ import java.util.LinkedList; import java.util.Map; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.ErrorCollector; @@ -66,23 +68,23 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; * RecordBatch implementation for the nested loop join operator */ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class); + private static final Logger logger = LoggerFactory.getLogger(NestedLoopJoinBatch.class); // Input indexes to correctly update the stats protected static final int LEFT_INPUT = 0; protected static final int RIGHT_INPUT = 1; // Schema on the left side - private BatchSchema leftSchema = null; + private BatchSchema leftSchema; // Schema on the right side - private BatchSchema rightSchema = null; + private BatchSchema rightSchema; // Runtime generated class implementing the NestedLoopJoin interface - private NestedLoopJoin nljWorker = null; + private NestedLoopJoin nljWorker; // Number of output records in the current outgoing batch - private int outputRecords = 0; + private int outputRecords; // We accumulate all the batches on the right side in a hyper container. private ExpandableHyperContainer rightContainer = new ExpandableHyperContainer(); @@ -198,12 +200,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi outputRecords = nljWorker.outputRecords(popConfig.getJoinType()); // Set the record count - for (final VectorWrapper<?> vw : container) { - vw.getValueVector().getMutator().setValueCount(outputRecords); - } - - // Set the record count in the container - container.setRecordCount(outputRecords); + container.setValueCount(outputRecords); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext()); @@ -413,6 +410,7 @@ public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoi batchMemoryManager.getRecordBatchSizer(LEFT_INDEX), getRecordBatchStatsContext()); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setEmpty(); } catch (ClassTransformationException | IOException e) { throw new SchemaChangeException(e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index bb49187..e9d7dd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -32,12 +32,14 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LimitRecordBatch.class); + private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class); private SelectionVector2 outgoingSv; private SelectionVector2 incomingSv; @@ -122,7 +124,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { container.clear(); transfers.clear(); - for(final VectorWrapper<?> v : incoming) { + for (final VectorWrapper<?> v : incoming) { final TransferPair pair = v.getValueVector().makeTransferPair( container.addOrGet(v.getField(), callBack)); transfers.add(pair); @@ -130,7 +132,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { final BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode(); - switch(svMode) { + switch (svMode) { case NONE: break; case TWO_BYTE: @@ -174,10 +176,11 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { final int inputRecordCount = incoming.getRecordCount(); if (inputRecordCount == 0) { setOutgoingRecordCount(0); + container.setEmpty(); return getFinalOutcome(false); } - for(final TransferPair tp : transfers) { + for (final TransferPair tp : transfers) { tp.transfer(); } // Check if current input record count is less than start offset. If yes then adjust the start offset since we @@ -185,6 +188,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { if (inputRecordCount <= recordStartOffset) { recordStartOffset -= inputRecordCount; setOutgoingRecordCount(0); + container.setEmpty(); } else { // Allocate SV2 vectors for the record count size since we transfer all the vectors buffer from input record // batch to output record batch and later an SV2Remover copies the needed records. @@ -194,7 +198,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { // clear memory for incoming sv (if any) if (incomingSv != null) { - outgoingSv.setBatchActualRecordCount(incomingSv.getBatchActualRecordCount()); + int incomingCount = incomingSv.getBatchActualRecordCount(); + outgoingSv.setBatchActualRecordCount(incomingCount); + container.setRecordCount(incomingCount); incomingSv.clear(); } @@ -218,7 +224,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } int svIndex = 0; - for(int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) { + for (int i = recordStartOffset; i < endRecordIndex; svIndex++, i++) { if (incomingSv != null) { outgoingSv.setIndex(svIndex, incomingSv.getIndex(i)); } else { @@ -226,12 +232,17 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } } outgoingSv.setRecordCount(svIndex); + outgoingSv.setBatchActualRecordCount(inputRecordCount); + // Actual number of values in the container; not the number in + // the SV. + container.setRecordCount(inputRecordCount); // Update the start offset recordStartOffset = 0; } private void setOutgoingRecordCount(int outputCount) { outgoingSv.setRecordCount(outputCount); + outgoingSv.setBatchActualRecordCount(outputCount); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java index 639a25a..07f5069 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java @@ -32,6 +32,8 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Modular implementation of the standard Drill record batch iterator @@ -51,7 +53,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; */ public class OperatorRecordBatch implements CloseableRecordBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class); + static final Logger logger = LoggerFactory.getLogger(OperatorRecordBatch.class); private final OperatorDriver driver; private final BatchAccessor batchAccessor; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java index 496f326..866b988 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java @@ -21,6 +21,8 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.vector.accessor.InvalidConversionError; import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Manages a row batch reader through its lifecycle. Created when the reader @@ -128,7 +130,7 @@ import org.apache.drill.exec.vector.accessor.UnsupportedConversionError; */ class ReaderState { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ReaderState.class); + static final Logger logger = LoggerFactory.getLogger(ReaderState.class); private enum State { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java index 701c82b..53664af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java @@ -25,6 +25,8 @@ import org.apache.drill.exec.physical.impl.protocol.BatchAccessor; import org.apache.drill.exec.physical.impl.protocol.OperatorExec; import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of the revised scan operator that uses a mutator aware of @@ -155,7 +157,7 @@ public class ScanOperatorExec implements OperatorExec { CLOSED } - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanOperatorExec.class); + static final Logger logger = LoggerFactory.getLogger(ScanOperatorExec.class); private final ScanOperatorEvents factory; private final boolean allowEmptyResult; @@ -295,6 +297,13 @@ public class ScanOperatorExec implements OperatorExec { if (allowEmptyResult && containerAccessor.batchCount() == 0 && containerAccessor.schemaVersion() > 0) { + + // We've exhausted all readers, none had data, but at least + // one had a schema. Any zero-sized batch produced by a reader + // was cleared when closing the reader. Recreate a valid empty + // batch here to return downstream. + + containerAccessor.getOutgoingContainer().setEmpty(); state = State.EMPTY; } else { state = State.END; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java index 712d21c..095372c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/framework/ManagedScanFramework.java @@ -23,6 +23,8 @@ import org.apache.drill.exec.physical.impl.scan.RowBatchReader; import org.apache.drill.exec.physical.impl.scan.ScanOperatorEvents; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator; import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.ScanOrchestratorBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Basic scan framework for a "managed" reader which uses the scan schema @@ -112,7 +114,7 @@ import org.apache.drill.exec.physical.impl.scan.project.ScanSchemaOrchestrator.S public class ManagedScanFramework implements ScanOperatorEvents { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ManagedScanFramework.class); + static final Logger logger = LoggerFactory.getLogger(ManagedScanFramework.class); /** * Creates a batch reader on demand. Unlike earlier versions of Drill, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java index 5fa2187..8628873 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java @@ -202,10 +202,10 @@ public abstract class ResolvedTuple implements VectorSource { public AbstractMapVector buildMap() { if (parentColumn.sourceIndex() != -1) { - final ResolvedTuple parentTuple = parentColumn.parent(); + ResolvedTuple parentTuple = parentColumn.parent(); inputMap = (AbstractMapVector) parentTuple.vector(parentColumn.sourceIndex()); } - final MaterializedField colSchema = parentColumn.schema(); + MaterializedField colSchema = parentColumn.schema(); outputMap = createMap(inputMap, MaterializedField.create( colSchema.getName(), colSchema.getType()), @@ -275,8 +275,8 @@ public abstract class ResolvedTuple implements VectorSource { // Create a new map array, reusing the offset vector from // the original input map. - final RepeatedMapVector source = (RepeatedMapVector) inputMap; - final UInt4Vector offsets = source.getOffsetVector(); + RepeatedMapVector source = (RepeatedMapVector) inputMap; + UInt4Vector offsets = source.getOffsetVector(); valueCount = offsets.getAccessor().getValueCount(); return new RepeatedMapVector(schema, offsets, null); @@ -342,7 +342,7 @@ public abstract class ResolvedTuple implements VectorSource { nullBuilder.build(vectorCache); } if (children != null) { - for (final ResolvedTuple child : children) { + for (ResolvedTuple child : children) { child.buildNulls(vectorCache.childCache(child.name())); } } @@ -353,7 +353,7 @@ public abstract class ResolvedTuple implements VectorSource { nullBuilder.load(rowCount); } if (children != null) { - for (final ResolvedTuple child : children) { + for (ResolvedTuple child : children) { child.loadNulls(innerCardinality(rowCount)); } } @@ -402,7 +402,7 @@ public abstract class ResolvedTuple implements VectorSource { if (children == null) { return; } - for (final ResolvedTuple child : children) { + for (ResolvedTuple child : children) { child.setRowCount(rowCount); } } @@ -426,7 +426,7 @@ public abstract class ResolvedTuple implements VectorSource { nullBuilder.close(); } if (children != null) { - for (final ResolvedTuple child : children) { + for (ResolvedTuple child : children) { child.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java index 695e26a..f7e63e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java @@ -23,7 +23,13 @@ import java.util.Map; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.aggregate.HashAggBatch; +import org.apache.drill.exec.physical.impl.aggregate.StreamingAggBatch; import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch; +import org.apache.drill.exec.physical.impl.filter.RuntimeFilterRecordBatch; +import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; +import org.apache.drill.exec.physical.impl.join.MergeJoinBatch; +import org.apache.drill.exec.physical.impl.join.NestedLoopJoinBatch; +import org.apache.drill.exec.physical.impl.limit.LimitRecordBatch; import org.apache.drill.exec.physical.impl.limit.PartitionLimitRecordBatch; import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch; import org.apache.drill.exec.physical.impl.protocol.OperatorRecordBatch; @@ -44,6 +50,8 @@ import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.exec.vector.VarDecimalVector; +import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.ZeroVector; import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector; import org.apache.drill.exec.vector.complex.DictVector; @@ -184,14 +192,20 @@ public class BatchValidator { */ private static Map<Class<? extends CloseableRecordBatch>, CheckMode> buildRules() { Map<Class<? extends CloseableRecordBatch>, CheckMode> rules = new IdentityHashMap<>(); - rules.put(OperatorRecordBatch.class, CheckMode.VECTORS); - rules.put(ScanBatch.class, CheckMode.VECTORS); + rules.put(OperatorRecordBatch.class, CheckMode.ZERO_SIZE); + rules.put(ScanBatch.class, CheckMode.ZERO_SIZE); rules.put(ProjectRecordBatch.class, CheckMode.ZERO_SIZE); rules.put(FilterRecordBatch.class, CheckMode.VECTORS); rules.put(PartitionLimitRecordBatch.class, CheckMode.VECTORS); rules.put(UnnestRecordBatch.class, CheckMode.VECTORS); rules.put(HashAggBatch.class, CheckMode.VECTORS); rules.put(RemovingRecordBatch.class, CheckMode.ZERO_SIZE); + rules.put(StreamingAggBatch.class, CheckMode.ZERO_SIZE); + rules.put(RuntimeFilterRecordBatch.class, CheckMode.ZERO_SIZE); + rules.put(FlattenRecordBatch.class, CheckMode.ZERO_SIZE); + rules.put(MergeJoinBatch.class, CheckMode.ZERO_SIZE); + rules.put(NestedLoopJoinBatch.class, CheckMode.ZERO_SIZE); + rules.put(LimitRecordBatch.class, CheckMode.ZERO_SIZE); return rules; } @@ -349,6 +363,8 @@ public class BatchValidator { validateDictVector(name, (DictVector) vector, topLevel); } else if (vector instanceof UnionVector) { validateUnionVector(name, (UnionVector) vector); + } else if (vector instanceof VarDecimalVector) { + validateVarDecimalVector(name, (VarDecimalVector) vector, topLevel); } else { logger.debug("Don't know how to validate vector: {} of class {}", name, vector.getClass().getSimpleName()); @@ -369,20 +385,22 @@ public class BatchValidator { } private void validateVarCharVector(String name, VarCharVector vector, boolean topLevel) { - int valueCount = vector.getAccessor().getValueCount(); + int dataLength = vector.getBuffer().writerIndex(); + validateVarWidthVector(name, vector, dataLength, topLevel); + } - // Disabled because a large number of operators - // set up offset vectors incorrectly. - if (valueCount == 0 && (!topLevel || !checkZeroSize)) { - return; - } + private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) { + int dataLength = vector.getBuffer().writerIndex(); + validateVarWidthVector(name, vector, dataLength, topLevel); + } + private void validateVarDecimalVector(String name, VarDecimalVector vector, + boolean topLevel) { int dataLength = vector.getBuffer().writerIndex(); - validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, - valueCount, dataLength, topLevel); + validateVarWidthVector(name, vector, dataLength, topLevel); } - private void validateVarBinaryVector(String name, VarBinaryVector vector, boolean topLevel) { + private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength, boolean topLevel) { int valueCount = vector.getAccessor().getValueCount(); // Disabled because a large number of operators @@ -391,8 +409,7 @@ public class BatchValidator { return; } - int dataLength = vector.getBuffer().writerIndex(); - validateOffsetVector(name + "-offsets", vector.getOffsetVector(), false, + validateOffsetVector(name + "-offsets", vector.getOffsetVector(), valueCount, dataLength, topLevel); } @@ -401,7 +418,7 @@ public class BatchValidator { int dataLength = dataVector.getAccessor().getValueCount(); int valueCount = vector.getAccessor().getValueCount(); int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(), - true, valueCount, dataLength, topLevel); + valueCount, dataLength, topLevel); if (dataLength != itemCount) { error(name, vector, String.format( @@ -419,7 +436,7 @@ public class BatchValidator { int valueCount = vector.getAccessor().getValueCount(); int maxBitCount = valueCount * 8; int elementCount = validateOffsetVector(name + "-offsets", - vector.getOffsetVector(), true, valueCount, maxBitCount, topLevel); + vector.getOffsetVector(), valueCount, maxBitCount, topLevel); BitVector dataVector = vector.getDataVector(); if (dataVector.getAccessor().getValueCount() != elementCount) { error(name, vector, String.format( @@ -452,7 +469,7 @@ public class BatchValidator { RepeatedMapVector vector, boolean topLevel) { int valueCount = vector.getAccessor().getValueCount(); int elementCount = validateOffsetVector(name + "-offsets", - vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel); + vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel); for (ValueVector child: vector) { validateVector(elementCount, child, false); } @@ -461,7 +478,7 @@ public class BatchValidator { private void validateDictVector(String name, DictVector vector, boolean topLevel) { int valueCount = vector.getAccessor().getValueCount(); int elementCount = validateOffsetVector(name + "-offsets", - vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel); + vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel); validateVector(elementCount, vector.getKeys(), false); validateVector(elementCount, vector.getValues(), false); } @@ -470,7 +487,7 @@ public class BatchValidator { RepeatedListVector vector, boolean topLevel) { int valueCount = vector.getAccessor().getValueCount(); int elementCount = validateOffsetVector(name + "-offsets", - vector.getOffsetVector(), true, valueCount, Integer.MAX_VALUE, topLevel); + vector.getOffsetVector(), valueCount, Integer.MAX_VALUE, topLevel); validateVector(elementCount, vector.getDataVector(), false); } @@ -500,16 +517,17 @@ public class BatchValidator { } private int validateOffsetVector(String name, UInt4Vector offsetVector, - boolean repeated, int valueCount, int maxOffset, boolean topLevel) { + int valueCount, int maxOffset, boolean topLevel) { UInt4Vector.Accessor accessor = offsetVector.getAccessor(); int offsetCount = accessor.getValueCount(); // TODO: Disabled because a large number of operators - // set up offset vectors incorrectly. -// if (!repeated && offsetCount == 0) { -// System.out.println(String.format( -// "Offset vector for %s: [0] has length 0, expected 1+", -// name)); -// return false; + // set up offset vectors incorrectly. Either that, or semantics + // are ill-defined: some vectors assume an offset vector length + // of 0 if the "outer" value count is zero (which, while fiddly, is + // a workable definition.) +// if (checkZeroSize && topLevel && offsetCount == 0) { +// error(name, offsetVector, +// "Offset vector has length 0, expected 1+"); // } if (valueCount == 0 && offsetCount > 1 || valueCount > 0 && offsetCount != valueCount + 1) { error(name, offsetVector, String.format( @@ -520,7 +538,8 @@ public class BatchValidator { return 0; } - // First value must be zero in current version. + // First value must be zero. (This is why offset vectors have one more + // value than the number of rows.) int prevOffset; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java index 4d70065..07f919d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.accessor.impl.HierarchicalFormatter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of the result set loader. Caches vectors @@ -163,7 +165,7 @@ public class ResultSetLoaderImpl implements ResultSetLoader, LoaderInternals { CLOSED } - protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResultSetLoaderImpl.class); + protected static final Logger logger = LoggerFactory.getLogger(ResultSetLoaderImpl.class); /** * Options provided to this loader. diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java index 057c0f2..de8a77e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/nested/TestFastComplexSchema.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.nested; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.test.BaseTestQuery; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -26,7 +26,6 @@ public class TestFastComplexSchema extends BaseTestQuery { @Test public void test() throws Exception { -// test("select r.r_name, t1.f from cp.`tpch/region.parquet` r join (select flatten(x) as f from (select convert_from('[0, 1]', 'json') as x from cp.`tpch/region.parquet`)) t1 on t1.f = r.r_regionkey"); test("SELECT r.r_name, \n" + " t1.f \n" + "FROM cp.`tpch/region.parquet` r \n" + @@ -111,5 +110,4 @@ public class TestFastComplexSchema extends BaseTestQuery { "(select first_name from cp.`employee.json` where first_name='Sheri')", "Flatten does not support inputs of non-list values"); } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java index 07b34c0..7610e71 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/flatten/TestFlatten.java @@ -28,22 +28,21 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; -import org.apache.drill.test.BaseTestQuery; import org.apache.drill.categories.OperatorTest; -import org.apache.drill.test.TestBuilder; import org.apache.drill.categories.UnlikelyTest; import org.apache.drill.exec.fn.interp.TestConstantFolding; import org.apache.drill.exec.store.easy.json.JSONRecordReader; import org.apache.drill.exec.util.JsonStringHashMap; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.SubDirTestWatcher; +import org.apache.drill.test.TestBuilder; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; - @Category(OperatorTest.class) public class TestFlatten extends BaseTestQuery { @@ -88,7 +87,6 @@ public class TestFlatten extends BaseTestQuery { .setRecord(jsonRecords) .createFiles(1, numCopies, "json"); - @SuppressWarnings("unchecked") List<JsonStringHashMap<String,Object>> data = Lists.newArrayList( mapOf("uid", 1l, "lst_lst_0", listOf(1l, 2l, 3l, 4l, 5l), @@ -123,7 +121,6 @@ public class TestFlatten extends BaseTestQuery { @Test public void testFlattenReferenceImpl() throws Exception { - @SuppressWarnings("unchecked") List<JsonStringHashMap<String,Object>> data = Lists.newArrayList( mapOf("a",1, "b",2, @@ -133,7 +130,6 @@ public class TestFlatten extends BaseTestQuery { listOf(1000,999) ))); List<JsonStringHashMap<String, Object>> result = flatten(flatten(flatten(data, "list_col"), "nested_list_col"), "nested_list_col"); - @SuppressWarnings("unchecked") List<JsonStringHashMap<String, Object>> expectedResult = Lists.newArrayList( mapOf("nested_list_col", 100, "list_col", 10,"a", 1, "b",2), mapOf("nested_list_col", 99, "list_col", 10,"a", 1, "b",2), @@ -195,7 +191,6 @@ public class TestFlatten extends BaseTestQuery { .setRecord(jsonRecord) .createFiles(1, numRecords, "json"); - @SuppressWarnings("unchecked") List<JsonStringHashMap<String,Object>> data = Lists.newArrayList( mapOf("int_list", inputList) ); diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java index 6d408b8..0278749 100644 --- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java @@ -102,7 +102,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public FieldReader getReader() { return reader; } @Override - public int getBufferSizeFor(final int valueCount) { + public int getBufferSizeFor(int valueCount) { if (valueCount == 0) { return 0; } @@ -121,8 +121,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public Mutator getMutator() { return mutator; } @Override - public void setInitialCapacity(final int valueCount) { - final long size = (long) valueCount * VALUE_WIDTH; + public void setInitialCapacity(int valueCount) { + long size = (long) valueCount * VALUE_WIDTH; // TODO: Replace this with MAX_BUFFER_SIZE once all // code is aware of the maximum vector size. if (size > MAX_ALLOCATION_SIZE) { @@ -170,7 +170,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F * if it can't allocate the new buffer */ @Override - public void allocateNew(final int valueCount) { + public void allocateNew(int valueCount) { allocateBytes(valueCount * VALUE_WIDTH); } @@ -182,14 +182,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F super.reset(); } - private void allocateBytes(final long size) { + private void allocateBytes(long size) { // TODO: Replace this with MAX_BUFFER_SIZE once all // code is aware of the maximum vector size. if (size > MAX_ALLOCATION_SIZE) { throw new OversizedAllocationException("Requested amount of memory is more than max allowed allocation size"); } - final int curSize = (int)size; + int curSize = (int)size; clear(); data = allocator.buffer(curSize); data.readerIndex(0); @@ -214,7 +214,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F ? 256 : allocationSizeInBytes * 2L; - final int currentCapacity = data.capacity(); + int currentCapacity = data.capacity(); // Some operations, such as Value Vector#exchange, can be change DrillBuf data field without corresponding allocation size changes. // Check that the size of the allocation is sufficient to copy the old buffer. while (newAllocationSize < currentCapacity) { @@ -238,7 +238,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F if (newAllocationSize == 0) { throw new IllegalStateException("Attempt to reAlloc a zero-sized vector"); } - final DrillBuf newBuf = allocator.buffer(newAllocationSize); + DrillBuf newBuf = allocator.buffer(newAllocationSize); newBuf.setBytes(0, data, 0, data.capacity()); newBuf.writerIndex(data.writerIndex()); data.release(1); @@ -259,9 +259,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public void load(SerializedField metadata, DrillBuf buffer) { Preconditions.checkArgument(this.field.getName().equals(metadata.getNamePart().getName()), "The field %s doesn't match the provided metadata %s.", this.field, metadata); - final int actualLength = metadata.getBufferLength(); - final int valueCount = metadata.getValueCount(); - final int expectedLength = valueCount * VALUE_WIDTH; + int actualLength = metadata.getBufferLength(); + int valueCount = metadata.getValueCount(); + int expectedLength = valueCount * VALUE_WIDTH; assert actualLength == expectedLength : String.format("Expected to load %d bytes but actually loaded %d bytes", expectedLength, actualLength); clear(); @@ -296,8 +296,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F } public void splitAndTransferTo(int startIndex, int length, ${minor.class}Vector target) { - final int startPoint = startIndex * VALUE_WIDTH; - final int sliceLength = length * VALUE_WIDTH; + int startPoint = startIndex * VALUE_WIDTH; + int sliceLength = length * VALUE_WIDTH; target.clear(); target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; target.data.writerIndex(sliceLength); @@ -402,14 +402,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F <#if (minor.class == "Interval")> public void get(int index, ${minor.class}Holder holder) { - final int offsetIndex = index * VALUE_WIDTH; + int offsetIndex = index * VALUE_WIDTH; holder.months = data.getInt(offsetIndex); holder.days = data.getInt(offsetIndex + ${minor.daysOffset}); holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset}); } public void get(int index, Nullable${minor.class}Holder holder) { - final int offsetIndex = index * VALUE_WIDTH; + int offsetIndex = index * VALUE_WIDTH; holder.isSet = 1; holder.months = data.getInt(offsetIndex); holder.days = data.getInt(offsetIndex + ${minor.daysOffset}); @@ -418,30 +418,30 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F @Override public ${friendlyType} getObject(int index) { - final int offsetIndex = index * VALUE_WIDTH; - final int months = data.getInt(offsetIndex); - final int days = data.getInt(offsetIndex + ${minor.daysOffset}); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + int offsetIndex = index * VALUE_WIDTH; + int months = data.getInt(offsetIndex); + int days = data.getInt(offsetIndex + ${minor.daysOffset}); + int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.fromInterval(months, days, millis); } public StringBuilder getAsStringBuilder(int index) { - final int offsetIndex = index * VALUE_WIDTH; - final int months = data.getInt(offsetIndex); - final int days = data.getInt(offsetIndex + ${minor.daysOffset}); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + int offsetIndex = index * VALUE_WIDTH; + int months = data.getInt(offsetIndex); + int days = data.getInt(offsetIndex + ${minor.daysOffset}); + int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.intervalStringBuilder(months, days, millis); } <#elseif (minor.class == "IntervalDay")> public void get(int index, ${minor.class}Holder holder) { - final int offsetIndex = index * VALUE_WIDTH; + int offsetIndex = index * VALUE_WIDTH; holder.days = data.getInt(offsetIndex); holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset}); } public void get(int index, Nullable${minor.class}Holder holder) { - final int offsetIndex = index * VALUE_WIDTH; + int offsetIndex = index * VALUE_WIDTH; holder.isSet = 1; holder.days = data.getInt(offsetIndex); holder.milliseconds = data.getInt(offsetIndex + ${minor.millisecondsOffset}); @@ -449,16 +449,16 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F @Override public ${friendlyType} getObject(int index) { - final int offsetIndex = index * VALUE_WIDTH; - final int days = data.getInt(offsetIndex); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + int offsetIndex = index * VALUE_WIDTH; + int days = data.getInt(offsetIndex); + int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.fromIntervalDay(days, millis); } public StringBuilder getAsStringBuilder(int index) { - final int offsetIndex = index * VALUE_WIDTH; - final int days = data.getInt(offsetIndex); - final int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); + int offsetIndex = index * VALUE_WIDTH; + int days = data.getInt(offsetIndex); + int millis = data.getInt(offsetIndex + ${minor.millisecondsOffset}); return DateUtilities.intervalDayStringBuilder(days, millis); } <#elseif minor.class == "Decimal28Sparse" || minor.class == "Decimal38Sparse" || minor.class == "Decimal28Dense" || minor.class == "Decimal38Dense"> @@ -551,7 +551,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F @Override public ${friendlyType} getObject(int index) { - final BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value()); + BigInteger value = BigInteger.valueOf(((${type.boxedType})get(index)).${type.javaType}Value()); return new BigDecimal(value, getField().getScale()); } <#else> diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java index 9f0b1d5..1ca1f5e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java @@ -215,8 +215,8 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements // This is performance critical code; every operation counts. // Please be thoughtful when changing the code. - final int valueIndex = prepareFill(); - final int fillCount = valueIndex - lastWriteIndex - 1; + int valueIndex = prepareFill(); + int fillCount = valueIndex - lastWriteIndex - 1; if (fillCount > 0) { fillEmpties(fillCount); } @@ -228,7 +228,7 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements } public final int prepareFill() { - final int valueIndex = vectorIndex.vectorIndex(); + int valueIndex = vectorIndex.vectorIndex(); if (valueIndex + 1 < capacity) { return valueIndex; } @@ -240,32 +240,32 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements } @Override - protected final void fillEmpties(final int fillCount) { + protected final void fillEmpties(int fillCount) { for (int i = 0; i < fillCount; i++) { fillOffset(nextOffset); } } @Override - public final void setNextOffset(final int newOffset) { - final int writeIndex = prepareWrite(); + public final void setNextOffset(int newOffset) { + int writeIndex = prepareWrite(); drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); nextOffset = newOffset; } - public final void reviseOffset(final int newOffset) { - final int writeIndex = vectorIndex.vectorIndex() + 1; + public final void reviseOffset(int newOffset) { + int writeIndex = vectorIndex.vectorIndex() + 1; drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset); nextOffset = newOffset; } - public final void fillOffset(final int newOffset) { + public final void fillOffset(int newOffset) { drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, newOffset); nextOffset = newOffset; } @Override - public final void setValue(final Object value) { + public final void setValue(Object value) { throw new InvalidConversionError( "setValue() not supported for the offset vector writer: " + value); } @@ -298,19 +298,22 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements @Override public void postRollover() { - final int newNext = nextOffset - rowStartOffset; + int newNext = nextOffset - rowStartOffset; super.postRollover(); nextOffset = newNext; } @Override public void setValueCount(int valueCount) { - mandatoryResize(valueCount); - // Value count is in row positions. + // Value count is in row positions, not index + // positions. (There are one more index positions + // than row positions.) + int offsetCount = valueCount + 1; + mandatoryResize(offsetCount); fillEmpties(valueCount - lastWriteIndex - 1); - vector().getBuffer().writerIndex((valueCount + 1) * VALUE_WIDTH); + vector().getBuffer().writerIndex(offsetCount * VALUE_WIDTH); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java index f19709d..046641b 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/BaseRepeatedValueVector.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.vector.complex; -import io.netty.buffer.DrillBuf; - import java.util.Collections; import java.util.Iterator; import java.util.Set; @@ -27,8 +25,8 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeRuntimeException; import org.apache.drill.exec.expr.BasicTypeHelper; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.AllocationManager.BufferLedger; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; @@ -38,10 +36,11 @@ import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VectorDescriptor; import org.apache.drill.exec.vector.ZeroVector; - import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.ObjectArrays; +import io.netty.buffer.DrillBuf; + public abstract class BaseRepeatedValueVector extends BaseValueVector implements RepeatedValueVector { public final static ValueVector DEFAULT_DATA_VECTOR = ZeroVector.INSTANCE; @@ -349,6 +348,12 @@ public abstract class BaseRepeatedValueVector extends BaseValueVector implements @Override public void setValueCount(int valueCount) { // TODO: populate offset end points + // Convention seems to be that if the "outer" value count + // is zero, then the offset vector can also be zero-length: + // the normal initial zero position is omitted. While this + // saves a bit of memory, it greatly complicates code that + // works with vectors because of the special case for zero-length + // vectors. offsets.getMutator().setValueCount(valueCount == 0 ? 0 : valueCount+1); final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount); vector.getMutator().setValueCount(childValueCount);