This is an automated email from the ASF dual-hosted git repository. progers pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new 17fdca1 DRILL-7724: Refactor metadata controller batch 17fdca1 is described below commit 17fdca1693773e94fd8b0706f4eee5e774299fa6 Author: Paul Rogers <par0...@yahoo.com> AuthorDate: Thu Apr 30 14:16:22 2020 -0700 DRILL-7724: Refactor metadata controller batch Also changed for (;;) infinite loops to while (true) as preferred by IntelliJ. --- .../drill/exec/store/hive/HiveStoragePlugin.java | 2 +- .../org/apache/drill/yarn/core/YarnRMClient.java | 8 +- .../exec/physical/config/StreamingAggregate.java | 3 - .../exec/physical/impl/aggregate/HashAggBatch.java | 6 +- .../physical/impl/aggregate/StreamingAggBatch.java | 31 +++-- .../impl/aggregate/StreamingAggTemplate.java | 9 +- .../impl/metadata/MetadataControllerBatch.java | 143 +++++++++------------ .../exec/physical/impl/scan/ScanOperatorExec.java | 2 +- .../physical/impl/xsort/ExternalSortBatch.java | 2 +- .../drill/exec/physical/impl/xsort/SortImpl.java | 2 +- .../drill/exec/planner/physical/AggPrelBase.java | 34 ++--- .../apache/drill/exec/record/VectorContainer.java | 10 ++ .../exec/store/easy/json/parser/ArrayParser.java | 2 +- .../store/easy/json/parser/DummyValueParser.java | 2 +- .../easy/json/parser/JsonStructureParser.java | 6 +- .../store/easy/json/parser/JsonValueParser.java | 4 +- .../exec/store/easy/json/parser/ObjectParser.java | 3 +- .../easy/json/parser/SimpleMessageParser.java | 2 +- .../work/foreman/rm/DistributedQueryQueue.java | 8 +- .../resultSet/impl/TestResultSetCopier.java | 2 +- .../drill/exec/physical/rowSet/TestRowSet.java | 2 +- .../drill/exec/server/rest/InteractiveUI.java | 2 +- .../java/org/apache/drill/test/ClientFixture.java | 8 +- .../java/org/apache/drill/test/QueryBuilder.java | 3 +- .../java/org/apache/drill/test/QueryResultSet.java | 4 +- .../org/apache/drill/test/QueryRowSetIterator.java | 2 +- 26 files changed, 145 insertions(+), 157 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index 4e7fde9..5f7061e 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -146,7 +146,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { } catch (Throwable e) { // Unwrap exception Throwable ex = e; - for (;;) { + while (true) { // Case for failing on an invalid cached connection if (ex instanceof MetaException || // Case for a timed-out impersonated connection, and diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java index 8905ce3..d388d5c 100644 --- a/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/YarnRMClient.java @@ -42,8 +42,8 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; */ public class YarnRMClient { - private YarnConfiguration conf; - private YarnClient yarnClient; + private final YarnConfiguration conf; + private final YarnClient yarnClient; /** * Application ID. Semantics are such that each session of Drill-on-YARN works @@ -126,7 +126,7 @@ public class YarnRMClient { ApplicationReport appReport; YarnApplicationState appState; ApplicationAttemptId attemptId; - for (;;) { + while (true) { appReport = getAppReport(); appState = appReport.getYarnApplicationState(); attemptId = appReport.getCurrentApplicationAttemptId(); @@ -160,7 +160,7 @@ public class YarnRMClient { public void waitForCompletion() throws YarnClientException { ApplicationReport appReport; YarnApplicationState appState; - for (;;) { + while (true) { appReport = getAppReport(); appState = appReport.getYarnApplicationState(); if (appState == YarnApplicationState.FINISHED diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java index 91956e0..3d074bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StreamingAggregate.java @@ -32,8 +32,6 @@ import java.util.List; @JsonTypeName("streaming-aggregate") public class StreamingAggregate extends AbstractSingle { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggregate.class); - private final List<NamedExpression> keys; private final List<NamedExpression> exprs; @@ -68,5 +66,4 @@ public class StreamingAggregate extends AbstractSingle { public int getOperatorType() { return CoreOperatorType.STREAMING_AGGREGATE_VALUE; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 2f0c9e2..cc1494d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -70,7 +70,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; -import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; @@ -256,10 +255,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { incomingSchema = incoming.getSchema(); createAggregator(); - for (VectorWrapper<?> w : container) { - AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), 0, 0, 0); - } - container.setEmpty(); + container.allocatePrecomputedChildCount(0, 0, 0); if (incoming.getRecordCount() > 0) { hashAggMemoryManager.update(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 7886a5e..b1e332e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -219,7 +219,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { if (first && getKeyExpressions().size() == 0) { // if we have a straight aggregate and empty input batch, we need to handle it in a different way - // Wewant to produce the special batch only if we got a NONE as the first outcome after + // We want to produce the special batch only if we got a NONE as the first outcome after // OK_NEW_SCHEMA. If we get a NONE immediately after we see an EMIT, then we have already handled // the case of the empty batch constructSpecialBatch(); @@ -259,7 +259,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { && aggregator.previousBatchProcessed()) { lastKnownOutcome = incoming.next(); if (!first ) { - //Setup needs to be called again. During setup, generated code saves a reference to the vectors + // Setup needs to be called again. During setup, generated code saves a reference to the vectors // pointed to by the incoming batch so that the de-referencing of the vector wrappers to get to // the vectors does not have to be done at each call to eval. However, after an EMIT is seen, // the vectors are replaced and the reference to the old vectors is no longer valid @@ -290,7 +290,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { ExternalSortBatch.releaseBatches(incoming); return returnOutcome; case RETURN_AND_RESET: - //WE could have got a string of batches, all empty, until we hit an emit + // We could have got a string of batches, all empty, until we hit an emit if (firstBatchForDataSet && getKeyExpressions().size() == 0 && recordCount == 0) { // if we have a straight aggregate and empty input batch, we need to handle it in a different way constructSpecialBatch(); @@ -371,10 +371,12 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } /** - * Method is invoked when we have a straight aggregate (no group by expression) and our input is empty. - * In this case we construct an outgoing batch with record count as 1. For the nullable vectors we don't set anything - * as we want the output to be NULL. For the required vectors (only for count()) we set the value to be zero since - * we don't zero out our buffers initially while allocating them. + * Invoked when we have a straight aggregate (no group by expression) and our + * input is empty. In this case we construct an outgoing batch with record + * count as 1. For the nullable vectors we don't set anything as we want the + * output to be NULL. For the required vectors (only for count()) we set the + * value to be zero since we don't zero out our buffers initially while + * allocating them. */ private void constructSpecialBatch() { int exprIndex = 0; @@ -426,9 +428,15 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { protected StreamingAggregator createAggregatorInternal() { ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); - cg.getCodeGenerator().plainJavaCapable(true); + // Streaming agg no longer plain Java capable. Stats generates code + // that fails when compiled normally. + // cannot override resetValues() in org.apache.drill.exec.physical.impl.aggregate.StreamingAggTemplate + // public boolean resetValues() + // ^ + // overridden method does not throw org.apache.drill.exec.exception.SchemaChangeException (compiler.err.override.meth.doesnt.throw) + // cg.getCodeGenerator().plainJavaCapable(true); // Uncomment out this line to debug the generated code. - //cg.getCodeGenerator().saveCodeForDebugging(true); + // cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[getKeyExpressions().size()]; @@ -460,7 +468,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { continue; } - /* Populate the complex writers for complex exprs */ + // Populate the complex writers for complex exprs if (expr instanceof DrillFuncHolderExpr && ((DrillFuncHolderExpr) expr).getHolder().isComplexWriterFuncHolder()) { // Need to process ComplexWriter function evaluation. @@ -651,7 +659,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { @Override public void dump() { - logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", container, popConfig, aggregator, incomingSchema); + logger.error("StreamingAggBatch[container={}, popConfig={}, aggregator={}, incomingSchema={}]", + container, popConfig, aggregator, incomingSchema); } @VisibleForTesting diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 4a6822d..c97e19e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -173,15 +173,15 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { return AggOutcome.CLEANUP_AND_RETURN; } - outside: while(true) { + outside: while (true) { // loop through existing records, adding as necessary. - if(!processRemainingRecordsInBatch()) { + if (!processRemainingRecordsInBatch()) { // output batch is full. Return. return setOkAndReturn(outerOutcome); } // if the current batch came with an EMIT, we're done since if we are here it means output batch consumed all // the rows in incoming batch - if(outerOutcome == EMIT) { + if (outerOutcome == EMIT) { // output the last record outputToBatch(previousIndex); resetIndex(); @@ -492,8 +492,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { } @Override - public void cleanup() { - } + public void cleanup() { } @Override public String toString() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java index 1bcb1ca..dce5822 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/metadata/MetadataControllerBatch.java @@ -36,6 +36,7 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.metastore.ColumnNamesOptions; import org.apache.drill.exec.metastore.analyze.AnalyzeColumnUtils; +import org.apache.drill.exec.metastore.analyze.MetadataControllerContext; import org.apache.drill.exec.metastore.analyze.MetadataIdentifierUtils; import org.apache.drill.exec.metastore.analyze.MetastoreAnalyzeConstants; import org.apache.drill.exec.ops.FragmentContext; @@ -105,18 +106,16 @@ import org.slf4j.LoggerFactory; public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataControllerPOP> { private static final Logger logger = LoggerFactory.getLogger(MetadataControllerBatch.class); + enum State { RIGHT, LEFT, WRITE, FINISHED } + private final Tables tables; private final TableInfo tableInfo; private final Map<String, MetadataInfo> metadataToHandle; - private final StatisticsRecordCollector statisticsCollector; - private final List<TableMetadataUnit> metadataUnits; + private final StatisticsRecordCollector statisticsCollector = new StatisticsCollectorImpl(); + private final List<TableMetadataUnit> metadataUnits = new ArrayList<>(); private final ColumnNamesOptions columnNamesOptions; - private boolean firstLeft = true; - private boolean firstRight = true; - private boolean finished; - private boolean finishedRight; - private int recordCount; + private State state = State.RIGHT; protected MetadataControllerBatch(MetadataControllerPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { @@ -127,113 +126,93 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC ? null : popConfig.getContext().metadataToHandle().stream() .collect(Collectors.toMap(MetadataInfo::identifier, Function.identity())); - this.metadataUnits = new ArrayList<>(); - this.statisticsCollector = new StatisticsCollectorImpl(); this.columnNamesOptions = new ColumnNamesOptions(context.getOptions()); } - protected boolean setupNewSchema() { - container.clear(); - container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null); - container.addOrGet(MetastoreAnalyzeConstants.SUMMARY_FIELD_NAME, Types.required(TypeProtos.MinorType.VARCHAR), null); - container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - container.setEmpty(); - return true; - } - @Override public IterOutcome innerNext() { - IterOutcome outcome; - boolean finishedLeft; - if (finished) { - return IterOutcome.NONE; - } + while (state != State.FINISHED) { + switch (state) { + case RIGHT: { - if (!finishedRight) { - outcome = handleRightIncoming(); - if (outcome != null) { - return outcome; + // Can only return NOT_YET + IterOutcome outcome = handleRightIncoming(); + if (outcome != null) { + return outcome; + } + break; + } + case LEFT: { + + // Can only return NOT_YET + IterOutcome outcome = handleLeftIncoming(); + if (outcome != null) { + return outcome; + } + break; + } + case WRITE: + writeToMetastore(); + createSummary(); + state = State.FINISHED; + return IterOutcome.OK_NEW_SCHEMA; + + case FINISHED: + break; + + default: + throw new IllegalStateException(state.name()); } } + return IterOutcome.NONE; + } + private IterOutcome handleRightIncoming() { outer: while (true) { - outcome = next(0, left); + IterOutcome outcome = next(0, right); switch (outcome) { case NONE: - // all incoming data was processed when returned OK_NEW_SCHEMA - finishedLeft = !firstLeft; + state = State.LEFT; break outer; case NOT_YET: return outcome; case OK_NEW_SCHEMA: - if (firstLeft) { - firstLeft = false; - if (!setupNewSchema()) { - outcome = IterOutcome.OK; - } - IterOutcome out = handleLeftIncoming(); - if (out != IterOutcome.OK) { - return out; - } - return outcome; - } - //fall through case OK: - assert !firstLeft : "First batch should be OK_NEW_SCHEMA"; - IterOutcome out = handleLeftIncoming(); - if (out != IterOutcome.OK) { - return out; - } + appendStatistics(statisticsCollector); break; default: throw new UnsupportedOperationException("Unsupported upstream state " + outcome); } } - - if (finishedLeft) { - IterOutcome out = writeToMetastore(); - finished = true; - return out; - } - return outcome; + return null; } - private IterOutcome handleRightIncoming() { - IterOutcome outcome; - outer: + private IterOutcome handleLeftIncoming() { while (true) { - outcome = next(0, right); + IterOutcome outcome = next(0, left); switch (outcome) { case NONE: - // all incoming data was processed - finishedRight = true; - break outer; + // all incoming data was processed when returned OK_NEW_SCHEMA + state = State.WRITE; + return null; case NOT_YET: return outcome; case OK_NEW_SCHEMA: - firstRight = false; - //fall through case OK: - assert !firstRight : "First batch should be OK_NEW_SCHEMA"; - appendStatistics(statisticsCollector); + metadataUnits.addAll(getMetadataUnits(left.getContainer())); break; default: throw new UnsupportedOperationException("Unsupported upstream state " + outcome); } } - return null; - } - - private IterOutcome handleLeftIncoming() { - metadataUnits.addAll(getMetadataUnits(left.getContainer())); - return IterOutcome.OK; } - private IterOutcome writeToMetastore() { - FilterExpression deleteFilter = popConfig.getContext().tableInfo().toFilter(); + private void writeToMetastore() { + MetadataControllerContext mdContext = popConfig.getContext(); + FilterExpression deleteFilter = mdContext.tableInfo().toFilter(); - for (MetadataInfo metadataInfo : popConfig.getContext().metadataToRemove()) { + for (MetadataInfo metadataInfo : mdContext.metadataToRemove()) { deleteFilter = FilterExpression.and(deleteFilter, FilterExpression.equal(MetastoreColumn.METADATA_KEY, metadataInfo.key())); } @@ -246,8 +225,7 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC .build()); } - MetastoreTableInfo metastoreTableInfo = popConfig.getContext().metastoreTableInfo(); - + MetastoreTableInfo metastoreTableInfo = mdContext.metastoreTableInfo(); if (tables.basicRequests().hasMetastoreTableInfoChanged(metastoreTableInfo)) { throw UserException.executionError(null) .message("Metadata for table [%s] was changed before analyze is finished", tableInfo.name()) @@ -256,7 +234,10 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC modify.overwrite(metadataUnits) .execute(); + } + private void createSummary() { + container.clear(); BitVector bitVector = container.addOrGet(MetastoreAnalyzeConstants.OK_FIELD_NAME, Types.required(TypeProtos.MinorType.BIT), null); VarCharVector varCharVector = @@ -272,11 +253,8 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC popConfig.getContext().tableInfo().workspace(), popConfig.getContext().tableInfo().name()).getBytes()); - bitVector.getMutator().setValueCount(1); - varCharVector.getMutator().setValueCount(1); - container.setRecordCount(++recordCount); - - return IterOutcome.OK; + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + container.setValueCount(1); } private List<TableMetadataUnit> getMetadataUnits(VectorContainer container) { @@ -696,7 +674,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC default: return objectReader.getObject(); } - } private Set<Path> getIncomingLocations(TupleReader reader) { @@ -748,6 +725,6 @@ public class MetadataControllerBatch extends AbstractBinaryRecordBatch<MetadataC @Override public int getRecordCount() { - return recordCount; + return container.getRecordCount(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java index 5081b27..b6c77ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java @@ -246,7 +246,7 @@ public class ScanOperatorExec implements OperatorExec { } private void nextAction(boolean readSchema) { - for (;;) { + while (true) { // If have a reader, read a batch if (readerState != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 42ea36c..915c1f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -396,7 +396,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // Loop over all input batches IterOutcome result = OK; - loop: for (;;) { + loop: while (true) { result = loadBatch(); switch (result) { case NONE: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java index e037600..cc8f6bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SortImpl.java @@ -539,7 +539,7 @@ public class SortImpl { // a single last pass. loop: - for (;;) { + while (true) { MergeTask task = memManager.consolidateBatches( allocator.getAllocatedMemory(), bufferedBatches.size(), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index 97c60f3..00ab539 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -48,35 +48,37 @@ import java.util.List; public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel { public enum OperatorPhase { - PHASE_1of1(false, false, false, "Single"), - PHASE_1of2(true, true, false, "1st"), - PHASE_2of2(true, false, true, "2nd"); - private boolean hasTwo; - private boolean is1st; - private boolean is2nd; + // Single phase aggregate + PHASE_1of1("Single"), + + // Distributed aggregate: partitioned first phase + PHASE_1of2("1st"), + + // Distibuted aggregate: non-partitioned overall aggregation + // phase + PHASE_2of2("2nd"); + private String name; - OperatorPhase(boolean hasTwo, - boolean is1st, - boolean is2nd, - String name) { - this.hasTwo = hasTwo; - this.is1st = is1st; - this.is2nd = is2nd; + OperatorPhase(String name) { this.name = name; } public boolean hasTwo() { - return hasTwo; + return this != PHASE_1of1; } public boolean is1st() { - return is1st; + return this == PHASE_1of2; } public boolean is2nd() { - return is2nd; + return this == PHASE_2of2; + } + + public boolean isFinal() { + return this != PHASE_1of2; } public String getName() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index 4ec0b8d..17d7b53 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; @@ -573,4 +574,13 @@ public class VectorContainer implements VectorAccessible { add(TypeHelper.getNewVector(field, allocator)); } } + + public void allocatePrecomputedChildCount(int valueCount, + int bytesPerValue, int childValCount) { + for (VectorWrapper<?> w : wrappers) { + AllocationHelper.allocatePrecomputedChildCount(w.getValueVector(), + valueCount, bytesPerValue, childValCount); + } + setEmpty(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java index 963b8c5..d6f5912 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ArrayParser.java @@ -54,7 +54,7 @@ public class ArrayParser extends AbstractElementParser { @Override public void parse(TokenIterator tokenizer) { arrayListener.onStart(); - top: for (;;) { + top: while (true) { // Position: [ (value, )* ^ ? JsonToken token = tokenizer.requireNext(); switch (token) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java index 7d5131b..d705f00 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/DummyValueParser.java @@ -55,7 +55,7 @@ class DummyValueParser extends AbstractElementParser { public void parseTail(TokenIterator tokenizer) { // Parse (field: value)* } - for (;;) { + while (true) { JsonToken token = tokenizer.requireNext(); switch (token) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java index 23ff3e1..4d46a46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureParser.java @@ -240,7 +240,7 @@ public class JsonStructureParser { // Only occurs for an empty document return false; } - for (;;) { + while (true) { try { return rootState.parseRoot(tokenizer); } catch (RecoverableJsonException e) { @@ -264,8 +264,8 @@ public class JsonStructureParser { private boolean recover() { logger.warn("Attempting recovery from JSON syntax error. " + tokenizer.context()); boolean firstAttempt = true; - for (;;) { - for (;;) { + while (true) { + while (true) { try { if (parser.isClosed()) { throw errorFactory().unrecoverableError(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java index 15fc128..8819e2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonValueParser.java @@ -74,7 +74,7 @@ public class JsonValueParser extends AbstractElementParser { // Accept value* ] boolean first = true; - for (;;) { + while (true) { JsonToken token = tokenizer.requireNext(); if (token == JsonToken.END_ARRAY) { json.append(tokenizer.textValue()); @@ -93,7 +93,7 @@ public class JsonValueParser extends AbstractElementParser { // Accept (field: value)* } boolean first = true; - for (;;) { + while (true) { JsonToken token = tokenizer.requireNext(); if (token == JsonToken.END_OBJECT) { json.append(tokenizer.textValue()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java index 226cc29..ce331a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/ObjectParser.java @@ -101,8 +101,7 @@ public class ObjectParser extends AbstractElementParser { listener.onStart(); // Parse (field: value)* } - - top: for (;;) { + top: while (true) { JsonToken token = tokenizer.requireNext(); // Position: { (key: value)* ? ^ switch (token) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java index a5181f1..b93f29e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/SimpleMessageParser.java @@ -87,7 +87,7 @@ public class SimpleMessageParser implements MessageParser { } private boolean parseToElement(TokenIterator tokenizer, int level) throws MessageContextException { - for (;;) { + while (true) { JsonToken token = tokenizer.requireNext(); switch (token) { case FIELD_NAME: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java index 58d8416..7f2e060 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java @@ -77,7 +77,7 @@ public class DistributedQueryQueue implements QueryQueue { * as the user changes system options. This value captures the value * calculated at the time that this lease was granted. */ - private long queryMemory; + private final long queryMemory; public DistributedQueueLease(QueryId queryId, String queueName, DistributedLease lease, long queryMemory) { @@ -197,9 +197,9 @@ public class DistributedQueryQueue implements QueryQueue { } private long memoryPerNode; - private SystemOptionManager optionManager; + private final SystemOptionManager optionManager; private ConfigSet configSet; - private ClusterCoordinator clusterCoordinator; + private final ClusterCoordinator clusterCoordinator; private long nextRefreshTime; private long memoryPerSmallQuery; private long memoryPerLargeQuery; @@ -335,7 +335,7 @@ public class DistributedQueryQueue implements QueryQueue { private void release(QueueLease lease) { DistributedQueueLease theLease = (DistributedQueueLease) lease; - for (;;) { + while (true) { try { theLease.lease.close(); theLease.lease = null; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java index 646c3f4..13780de 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/resultSet/impl/TestResultSetCopier.java @@ -412,7 +412,7 @@ public class TestResultSetCopier extends SubOperatorTest { // Equivalent of an entire operator run int start = 1; - for (;;) { + while (true) { // Equivalent of operator next() method diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java index 975811e..da4705b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/TestRowSet.java @@ -1220,7 +1220,7 @@ public class TestRowSet extends SubOperatorTest { // away the last row because the row set abstraction does not // implement vector overflow other than throwing an exception. - for (;;) { + while (true) { writer.scalar(0).setInt(count); writer.scalar(1).setString(varCharValue); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java index 1e2617b..5189db8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/InteractiveUI.java @@ -32,7 +32,7 @@ public class InteractiveUI extends ClusterTest { builder.configBuilder().put(ExecConstants.HTTP_ENABLE, true); try { startCluster(builder); - for (;;) { + while (true) { Thread.sleep(1000); } } catch (Exception e) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java index bbf684e..442fd8e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClientFixture.java @@ -94,7 +94,7 @@ public class ClientFixture implements AutoCloseable { } } - private ClusterFixture cluster; + private final ClusterFixture cluster; private DrillClient client; public ClientFixture(ClientBuilder builder) throws RpcException { @@ -326,7 +326,7 @@ public class ClientFixture implements AutoCloseable { public String parseNext() throws IOException { boolean eof = false; StringBuilder buf = new StringBuilder(); - for (;;) { + while (true) { int c = in.read(); if (c == -1) { eof = true; @@ -339,7 +339,7 @@ public class ClientFixture implements AutoCloseable { if (c == '"' || c == '\'' || c == '`') { int quote = c; boolean escape = false; - for (;;) { + while (true) { c = in.read(); if (c == -1) { throw new IllegalArgumentException("Mismatched quote: " + (char) c); @@ -363,7 +363,7 @@ public class ClientFixture implements AutoCloseable { public int exec(Reader in) throws IOException { StatementParser parser = new StatementParser(in); int count = 0; - for (;;) { + while (true) { String stmt = parser.parseNext(); if (stmt == null) { logger.debug("----"); diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java index 6b0641d..4828f6c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java @@ -709,8 +709,7 @@ public class QueryBuilder { int batchCount = 0; QueryId queryId = null; QueryState state; - loop: - for (;;) { + loop: while (true) { QueryEvent event = listener.get(); switch (event.type) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java index f3c436d..3a3fe67 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.physical.rowSet.RowSet; */ public class QueryResultSet { - private BufferingQueryEventListener listener; + private final BufferingQueryEventListener listener; private boolean eof; private int recordCount = 0; private int batchCount = 0; @@ -58,7 +58,7 @@ public class QueryResultSet { if (eof) { return null; } - for (;;) { + while (true) { QueryEvent event = listener.get(); switch (event.type) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java index c8d8459..acc5056 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryRowSetIterator.java @@ -52,7 +52,7 @@ public class QueryRowSetIterator implements Iterator<DirectRowSet>, Iterable<Dir @Override public boolean hasNext() { - for (;;) { + while (true) { QueryEvent event = listener.get(); state = event.state; batch = null;