DRILL-5832: Change OperatorFixture to use system option manager - Rename FixtureBuilder to ClusterFixtureBuilder - Provide alternative way to reset system/session options - Fix for DRILL-5833: random failure in TestParquetWriter - Provide strict, but clear, errors for missing options
closes #970 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/42fc11e5 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/42fc11e5 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/42fc11e5 Branch: refs/heads/master Commit: 42fc11e53557477ac01c7dd31c3aa93e22fb4384 Parents: 3036d37 Author: Paul Rogers <prog...@maprtech.com> Authored: Mon Oct 2 15:45:21 2017 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Mon Nov 13 14:59:19 2017 +0200 ---------------------------------------------------------------------- .../drill/common/util/CoreDecimalUtility.java | 9 +- .../apache/drill/exec/hive/TestHiveStorage.java | 7 +- .../drill/exec/ops/BufferManagerImpl.java | 5 +- .../drill/exec/physical/impl/ImplCreator.java | 3 + .../physical/impl/MergingReceiverCreator.java | 5 +- .../drill/exec/physical/impl/ScanBatch.java | 24 +- .../drill/exec/physical/impl/ScreenCreator.java | 6 +- .../drill/exec/physical/impl/TraceInjector.java | 6 +- .../exec/physical/impl/WriterRecordBatch.java | 2 + .../exec/server/options/BaseOptionManager.java | 76 ++++- .../exec/server/options/BaseOptionSet.java | 57 ---- .../drill/exec/server/options/OptionSet.java | 44 +++ .../exec/server/options/OptionValidator.java | 1 - .../drill/exec/server/options/OptionValue.java | 4 + .../server/options/PersistedOptionValue.java | 1 + .../server/options/SystemOptionManager.java | 26 +- .../drill/exec/store/dfs/FileSelection.java | 12 - .../drill/exec/store/dfs/FileSystemConfig.java | 3 +- .../exec/store/dfs/FileSystemSchemaFactory.java | 3 +- .../drill/exec/store/dfs/ReadEntryFromHDFS.java | 4 +- .../store/parquet/ParquetScanBatchCreator.java | 3 +- .../parquet/columnreaders/ParquetSchema.java | 2 +- .../parquet2/DrillParquetGroupConverter.java | 33 +- .../exec/store/parquet2/DrillParquetReader.java | 39 +-- .../DrillParquetRecordMaterializer.java | 6 +- .../store/provider/InMemoryStoreProvider.java | 11 +- .../java/org/apache/drill/BaseTestQuery.java | 35 +- .../physical/impl/agg/TestHashAggrSpill.java | 4 +- .../impl/limit/TestLimitWithExchanges.java | 9 +- .../impl/validate/TestValidationOptions.java | 8 +- .../physical/impl/writer/TestParquetWriter.java | 113 ++++--- .../writer/TestParquetWriterEmptyFiles.java | 3 +- .../impl/xsort/TestSimpleExternalSort.java | 10 +- .../impl/xsort/TestSortSpillWithException.java | 4 +- .../impl/xsort/managed/SortTestUtilities.java | 1 + .../impl/xsort/managed/TestShortArrays.java | 2 +- .../exec/server/TestDrillbitResilience.java | 33 +- .../exec/server/rest/StatusResourcesTest.java | 6 +- .../store/parquet/ParquetInternalsTest.java | 4 +- .../parquet/TestParquetFilterPushDown.java | 8 +- .../exec/store/sys/TestPStoreProviders.java | 4 +- .../drill/exec/util/TestQueryMemoryAlloc.java | 18 +- .../complex/writer/TestExtendedTypes.java | 18 +- .../org/apache/drill/test/ClientFixture.java | 10 +- .../org/apache/drill/test/ClusterFixture.java | 26 +- .../drill/test/ClusterFixtureBuilder.java | 326 +++++++++++++++++++ .../java/org/apache/drill/test/ClusterTest.java | 2 +- .../java/org/apache/drill/test/ExampleTest.java | 6 +- .../org/apache/drill/test/FixtureBuilder.java | 326 ------------------- .../org/apache/drill/test/OperatorFixture.java | 114 ++----- .../apache/drill/test/TestConfigLinkage.java | 34 +- .../org/apache/drill/test/package-info.java | 2 +- 52 files changed, 761 insertions(+), 757 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/common/src/main/java/org/apache/drill/common/util/CoreDecimalUtility.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/util/CoreDecimalUtility.java b/common/src/main/java/org/apache/drill/common/util/CoreDecimalUtility.java index 9f61ccf..bf12cfb 100644 --- a/common/src/main/java/org/apache/drill/common/util/CoreDecimalUtility.java +++ b/common/src/main/java/org/apache/drill/common/util/CoreDecimalUtility.java @@ -82,10 +82,9 @@ public class CoreDecimalUtility { } public static boolean isDecimalType(TypeProtos.MinorType minorType) { - if (minorType == TypeProtos.MinorType.DECIMAL9 || minorType == TypeProtos.MinorType.DECIMAL18 || - minorType == TypeProtos.MinorType.DECIMAL28SPARSE || minorType == TypeProtos.MinorType.DECIMAL38SPARSE) { - return true; - } - return false; + return minorType == TypeProtos.MinorType.DECIMAL9 || + minorType == TypeProtos.MinorType.DECIMAL18 || + minorType == TypeProtos.MinorType.DECIMAL28SPARSE || + minorType == TypeProtos.MinorType.DECIMAL38SPARSE; } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index efd26ff..cf98797 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -26,7 +26,6 @@ import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.UserProtos; -import org.apache.drill.test.OperatorFixture; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.common.type.HiveVarchar; import org.joda.time.DateTime; @@ -73,10 +72,8 @@ public class TestHiveStorage extends HiveTestBase { .baselineValues(200L) .go(); } finally { - final OperatorFixture.TestOptionSet testOptionSet = new OperatorFixture.TestOptionSet(); - test(String.format("alter session set `%s` = %s", - ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS, - Boolean.toString(testOptionSet.getDefault(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS).bool_val))); + test("alter session reset `%s`", + ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS); } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java index aa00b1c..d0dcace 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -44,6 +44,7 @@ public class BufferManagerImpl implements BufferManager { managedBuffers.clear(); } + @Override public DrillBuf replace(DrillBuf old, int newSize) { if (managedBuffers.remove(old.memoryAddress()) == null) { throw new IllegalStateException("Tried to remove unmanaged buffer."); @@ -52,10 +53,12 @@ public class BufferManagerImpl implements BufferManager { return getManagedBuffer(newSize); } + @Override public DrillBuf getManagedBuffer() { return getManagedBuffer(256); } + @Override public DrillBuf getManagedBuffer(int size) { DrillBuf newBuf = allocator.buffer(size, this); managedBuffers.put(newBuf.memoryAddress(), newBuf); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 58bf383..0871621 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -105,6 +105,7 @@ public class ImplCreator { } /** Create RootExec and its children (RecordBatches) for given FragmentRoot */ + @SuppressWarnings("unchecked") private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException { final List<RecordBatch> childRecordBatches = getChildren(root, context); @@ -141,6 +142,7 @@ public class ImplCreator { return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() { @Override public RecordBatch run() throws Exception { + @SuppressWarnings("unchecked") final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch( context, op, childRecordBatches); operators.addFirst(batch); @@ -153,6 +155,7 @@ public class ImplCreator { throw new ExecutionSetupException(errMsg, e); } } else { + @SuppressWarnings("unchecked") final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches); operators.addFirst(batch); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 59e7448..690a662 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -31,6 +31,7 @@ import org.apache.drill.exec.work.batch.RawBatchBuffer; public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverCreator.class); + @SuppressWarnings("resource") @Override public MergingRecordBatch getBatch(FragmentContext context, MergingReceiverPOP receiver, @@ -45,6 +46,4 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> return new MergingRecordBatch(context, receiver, buffers); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 64be129..6c2c171 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -165,9 +165,9 @@ public class ScanBatch implements CloseableRecordBatch { try { while (true) { if (currentReader == null && !getNextReaderIfHas()) { - releaseAssets(); // All data has been read. Release resource. - done = true; - return IterOutcome.NONE; + releaseAssets(); // All data has been read. Release resource. + done = true; + return IterOutcome.NONE; } injector.injectChecked(context.getExecutionControls(), "next-allocate", OutOfMemoryException.class); currentReader.allocate(mutator.fieldVectorMap()); @@ -235,15 +235,14 @@ public class ScanBatch implements CloseableRecordBatch { } private boolean getNextReaderIfHas() throws ExecutionSetupException { - if (readers.hasNext()) { - currentReader = readers.next(); - implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; - currentReader.setup(oContext, mutator); - currentReaderClassName = currentReader.getClass().getSimpleName(); - return true; - } else { + if (!readers.hasNext()) { return false; } + currentReader = readers.next(); + implicitValues = implicitColumns.hasNext() ? implicitColumns.next() : null; + currentReader.setup(oContext, mutator); + currentReaderClassName = currentReader.getClass().getSimpleName(); + return true; } private void addImplicitVectors() { @@ -251,8 +250,7 @@ public class ScanBatch implements CloseableRecordBatch { if (!implicitColumnList.isEmpty()) { for (String column : implicitColumnList.get(0).keySet()) { final MaterializedField field = MaterializedField.create(column, Types.optional(MinorType.VARCHAR)); - @SuppressWarnings("resource") - final ValueVector v = mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/); + mutator.addField(field, NullableVarCharVector.class, true /*implicit field*/); } } } catch(SchemaChangeException e) { @@ -338,7 +336,6 @@ public class ScanBatch implements CloseableRecordBatch { return implicitFieldVectorMap; } - @SuppressWarnings("resource") @Override public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { @@ -388,6 +385,7 @@ public class ScanBatch implements CloseableRecordBatch { schemaChanged = false; } + @SuppressWarnings("resource") private <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz, boolean isImplicitField) throws SchemaChangeException { Map<String, ValueVector> fieldVectorMap; http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 60355fb..d9abf40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingUserConnection; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -33,12 +32,11 @@ import org.apache.drill.exec.proto.UserBitShared.QueryData; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; - -import com.google.common.base.Preconditions; - import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; +import com.google.common.base.Preconditions; + public class ScreenCreator implements RootCreator<Screen> { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java index 92d1882..0aac91e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,10 +35,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceInjector.class); static int traceTagCount = 0; - - RootExec root = null; - private ScreenCreator sc = new ScreenCreator(); public static PhysicalOperator getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { TraceInjector tI = new TraceInjector(); @@ -89,5 +86,4 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra return newOp; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 939832b..0ea17d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -131,11 +131,13 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } private void addOutputContainerData() { + @SuppressWarnings("resource") final VarCharVector fragmentIdVector = (VarCharVector) container.getValueAccessorById( VarCharVector.class, container.getValueVectorId(SchemaPath.getSimplePath("Fragment")).getFieldIds()) .getValueVector(); AllocationHelper.allocate(fragmentIdVector, 1, 50); + @SuppressWarnings("resource") final BigIntVector summaryVector = (BigIntVector) container.getValueAccessorById(BigIntVector.class, container.getValueVectorId(SchemaPath.getSimplePath("Number of records written")).getFieldIds()) .getValueVector(); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java index 5a0fa2d..a502e07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionManager.java @@ -18,15 +18,87 @@ package org.apache.drill.exec.server.options; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.server.options.OptionValue.Kind; import java.util.Iterator; /** - * This {@link OptionManager} implements some the basic methods and should be extended by concrete implementations. + * This {@link OptionManager} implements some the basic methods and should be + * extended by concrete implementations. */ -public abstract class BaseOptionManager extends BaseOptionSet implements OptionManager { + +public abstract class BaseOptionManager implements OptionManager { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOptionManager.class); + /** + * Gets the current option value given a validator. + * + * @param validator the validator + * @return option value + * @throws IllegalArgumentException - if the validator is not found + */ + private OptionValue getOptionSafe(OptionValidator validator) { + final String optionName = validator.getOptionName(); + OptionValue value = getOption(optionName); + return value == null ? getDefault(optionName) : value; + } + + @Override + public boolean getOption(TypeValidators.BooleanValidator validator) { + return getOptionSafe(validator).bool_val; + } + + @Override + public double getOption(TypeValidators.DoubleValidator validator) { + return getOptionSafe(validator).float_val; + } + + @Override + public long getOption(TypeValidators.LongValidator validator) { + return getOptionSafe(validator).num_val; + } + + @Override + public String getOption(TypeValidators.StringValidator validator) { + return getOptionSafe(validator).string_val; + } + + @Override + public boolean getBoolean(String name) { + return getByType(name, Kind.BOOLEAN).bool_val; + } + + @Override + public long getLong(String name) { + return getByType(name, Kind.LONG).num_val; + } + + @Override + public double getDouble(String name) { + return getByType(name, Kind.DOUBLE).float_val; + } + + @Override + public String getString(String name) { + return getByType(name, Kind.STRING).string_val; + } + + private OptionValue getByType(String name, Kind dataType) { + OptionValue value = getOption(name); + if (value == null) { + throw UserException.systemError(null) + .addContext("Undefined option: " + name) + .build(logger); + } + if (value.kind != dataType) { + throw UserException.systemError(null) + .addContext("Option " + name + " is of data type " + + value.kind + " but was requested as " + dataType) + .build(logger); + } + return value; + } + @Override public OptionList getInternalOptionList() { return getAllOptionList(true); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionSet.java deleted file mode 100644 index f664401..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/BaseOptionSet.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.server.options; - -/** - * A basic implementation of an {@link OptionSet}. - */ -public abstract class BaseOptionSet implements OptionSet { - /** - * Gets the current option value given a validator. - * - * @param validator the validator - * @return option value - * @throws IllegalArgumentException - if the validator is not found - */ - private OptionValue getOptionSafe(OptionValidator validator) { - final String optionName = validator.getOptionName(); - OptionValue value = getOption(optionName); - return value == null ? getDefault(optionName) : value; - } - - @Override - public boolean getOption(TypeValidators.BooleanValidator validator) { - return getOptionSafe(validator).bool_val; - } - - @Override - public double getOption(TypeValidators.DoubleValidator validator) { - return getOptionSafe(validator).float_val; - } - - @Override - public long getOption(TypeValidators.LongValidator validator) { - return getOptionSafe(validator).num_val; - } - - @Override - public String getOption(TypeValidators.StringValidator validator) { - return getOptionSafe(validator).string_val; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java index 702d7df..e96d571 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionSet.java @@ -74,4 +74,48 @@ public interface OptionSet { * @return the string value */ String getOption(TypeValidators.StringValidator validator); + + /** + * Return the value of a Boolean option. + * + * @param name option name + * @return the Boolean value + * @throws IllegalArgumentException if the option is undefined or + * is not of the correct data type + */ + + boolean getBoolean(String name); + + /** + * Return the value of a long option. + * + * @param name option name + * @return the long value + * @throws IllegalArgumentException if the option is undefined or + * is not of the correct data type + */ + + long getLong(String name); + + /** + * Return the value of a double option. + * + * @param name option name + * @return the double value + * @throws IllegalArgumentException if the option is undefined or + * is not of the correct data type + */ + + double getDouble(String name); + + /** + * Return the value of a String option. + * + * @param name option name + * @return the String value + * @throws IllegalArgumentException if the option is undefined or + * is not of the correct data type + */ + + String getString(String name); } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java index 5418408..9143927 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValidator.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.server.options; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.server.options.OptionValue.Kind; http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java index e14a846..fc4a9f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java @@ -133,10 +133,14 @@ public class OptionValue implements Comparable<OptionValue> { return create(type, name, ((Boolean) val).booleanValue(), scope); } else if (val instanceof Long) { return create(type, name, ((Long) val).longValue(), scope); + } else if (val instanceof Integer) { + return create(type, name, ((Integer) val).longValue(), scope); } else if (val instanceof String) { return create(type, name, (String) val, scope); } else if (val instanceof Double) { return create(type, name, ((Double) val).doubleValue(), scope); + } else if (val instanceof Float) { + return create(type, name, ((Float) val).doubleValue(), scope); } throw new IllegalArgumentException(String.format("Unsupported type %s", val.getClass())); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java index 685799c..720a040 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/PersistedOptionValue.java @@ -269,6 +269,7 @@ public class PersistedOptionValue { * value of an option. This deserializer is essentially future proof since it only requires a value * to be stored for an option. */ + @SuppressWarnings("serial") public static class Deserializer extends StdDeserializer<PersistedOptionValue> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Deserializer.class); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e81ed0b..1c45547 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -36,8 +36,10 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.sys.PersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.util.AssertionUtil; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -223,7 +225,6 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea private final PersistentStoreProvider provider; - private final DrillConfig bootConfig; /** * Persistent store for options that have been changed from default. * NOTE: CRUD operations must use lowercase keys. @@ -243,12 +244,25 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea this.config = PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), PersistedOptionValue.class) .name("sys.options") .build(); - this.bootConfig = bootConfig; this.definitions = definitions; this.defaults = populateDefaultValues(definitions, bootConfig); } /** + * Test-only, in-memory version of the system option manager. + * + * @param bootConfig Drill config + */ + + @VisibleForTesting + public SystemOptionManager(final DrillConfig bootConfig) { + this.provider = new InMemoryStoreProvider(100); + this.config = null; + this.definitions = SystemOptionManager.createDefaultOptionDefinitions(); + this.defaults = populateDefaultValues(definitions, bootConfig); + } + + /** * Initializes this option manager. * * @return this option manager @@ -315,7 +329,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea @Override public OptionValue getDefault(String optionName) { - return defaults.get(optionName); + OptionValue value = defaults.get(optionName); + if (value == null) { + throw UserException.systemError(null) + .addContext("Undefined default value for option: " + optionName) + .build(logger); + } + return value; } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 81c7ad2..293259f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -208,18 +208,6 @@ public class FileSelection { return this.wasAllPartitionsPruned; } - private static String commonPath(final List<FileStatus> statuses) { - if (statuses == null || statuses.isEmpty()) { - return ""; - } - - final List<String> files = Lists.newArrayList(); - for (final FileStatus status : statuses) { - files.add(status.getPath().toString()); - } - return commonPathForFiles(files); - } - /** * Returns longest common path for the given list of files. * http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java index adc3909..3c8f3a7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.dfs; import java.util.Map; -import com.fasterxml.jackson.annotation.JsonInclude; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java index cf30162..5d99377 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java @@ -46,7 +46,6 @@ import org.apache.hadoop.fs.Path; * This is the top level schema that responds to root level path requests. Also supports */ public class FileSystemSchemaFactory implements SchemaFactory{ - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemSchemaFactory.class); public static final String DEFAULT_WS_NAME = "default"; @@ -61,6 +60,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{ @Override public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws IOException { + @SuppressWarnings("resource") FileSystemSchema schema = new FileSystemSchema(schemaName, schemaConfig); SchemaPlus plusOfThis = parent.add(schema.getName(), schema); schema.setPlus(plusOfThis); @@ -75,6 +75,7 @@ public class FileSystemSchemaFactory implements SchemaFactory{ super(ImmutableList.<String>of(), name); for(WorkspaceSchemaFactory f : factories){ if (f.accessible(schemaConfig.getUserName())) { + @SuppressWarnings("resource") WorkspaceSchema s = f.createSchema(getSchemaPath(), schemaConfig); schemaMap.put(s.getName(), s); } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java index 97124bc..e02d841 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,10 +34,12 @@ public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork{ this.length = length; } + @Override public long getStart() { return start; } + @Override public long getLength() { return length; } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 6017948..bb0b65f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -57,6 +57,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total"; private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read"; + @SuppressWarnings("resource") @Override public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { @@ -119,7 +120,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan if (logger.isDebugEnabled()) { logger.debug(containsCorruptDates.toString()); } - if (!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val && !isComplex(footers.get(e.getPath()))) { + if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footers.get(e.getPath()))) { readers.add( new ParquetRecordReader( context, e.getPath(), e.getRowGroupIndex(), e.getNumRecordsToRead(), fs, http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java index 10187b7..773f3d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java @@ -118,7 +118,7 @@ public class ParquetSchema { loadParquetSchema(); computeFixedPart(); - if (! selectedColumnMetadata.isEmpty() && allFieldsFixedLength) { + if (! selectedColumnMetadata.isEmpty() && allFieldsFixedLength) { recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount()), ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH); } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index 5c8db91..5c7c8e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.store.parquet2; -import io.netty.buffer.DrillBuf; +import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; import java.math.BigDecimal; import java.util.ArrayList; @@ -48,7 +48,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.ParquetColumnMetadata; -import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; @@ -67,8 +66,6 @@ import org.apache.drill.exec.vector.complex.writer.TimeStampWriter; import org.apache.drill.exec.vector.complex.writer.TimeWriter; import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter; import org.apache.drill.exec.vector.complex.writer.VarCharWriter; -import org.joda.time.DateTimeConstants; - import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; @@ -80,10 +77,11 @@ import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Type.Repetition; +import org.joda.time.DateTimeConstants; import com.google.common.collect.Lists; -import static org.apache.drill.exec.store.parquet.ParquetReaderUtility.NanoTimeUtils.getDateTimeValueFromBinary; +import io.netty.buffer.DrillBuf; public class DrillParquetGroupConverter extends GroupConverter { @@ -182,7 +180,9 @@ public class DrillParquetGroupConverter extends GroupConverter { switch(type.getOriginalType()) { case DECIMAL: { ParquetReaderUtility.checkDecimalTypeEnabled(options); - Decimal9Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal9() : mapWriter.decimal9(name); + Decimal9Writer writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal9() + : mapWriter.decimal9(name, type.getDecimalMetadata().getScale(), type.getDecimalMetadata().getPrecision()); return new DrillDecimal9Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); } case DATE: { @@ -218,7 +218,9 @@ public class DrillParquetGroupConverter extends GroupConverter { switch(type.getOriginalType()) { case DECIMAL: { ParquetReaderUtility.checkDecimalTypeEnabled(options); - Decimal18Writer writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal18() : mapWriter.decimal18(name); + Decimal18Writer writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal18() + : mapWriter.decimal18(name, type.getDecimalMetadata().getScale(), type.getDecimalMetadata().getPrecision()); return new DrillDecimal18Converter(writer, type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale()); } case TIMESTAMP_MILLIS: { @@ -270,10 +272,14 @@ public class DrillParquetGroupConverter extends GroupConverter { ParquetReaderUtility.checkDecimalTypeEnabled(options); DecimalMetadata metadata = type.getDecimalMetadata(); if (metadata.getPrecision() <= 28) { - Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); + Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal28Sparse() + : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } else { - Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); + Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal38Sparse() + : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } } @@ -287,10 +293,14 @@ public class DrillParquetGroupConverter extends GroupConverter { ParquetReaderUtility.checkDecimalTypeEnabled(options); DecimalMetadata metadata = type.getDecimalMetadata(); if (metadata.getPrecision() <= 28) { - Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); + Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal28Sparse() + : mapWriter.decimal28Sparse(name, metadata.getScale(), metadata.getPrecision()); return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } else { - Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); + Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED + ? mapWriter.list(name).decimal38Sparse() + : mapWriter.decimal38Sparse(name, metadata.getScale(), metadata.getPrecision()); return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } } else if (type.getOriginalType() == OriginalType.INTERVAL) { @@ -620,6 +630,7 @@ public class DrillParquetGroupConverter extends GroupConverter { private VarBinaryWriter writer; private VarBinaryHolder holder = new VarBinaryHolder(); + @SuppressWarnings("resource") public DrillFixedBinaryToVarbinaryConverter(VarBinaryWriter writer, int length, DrillBuf buf) { this.writer = writer; holder.buffer = buf = buf.reallocIfNeeded(length); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 9e66f6d..e74c621 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -32,7 +32,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; @@ -47,19 +46,18 @@ import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableIntVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.io.RecordReader; import org.apache.parquet.hadoop.ColumnChunkIncReadStore; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -84,14 +82,7 @@ public class DrillParquetReader extends AbstractRecordReader { private RecordReader<Void> recordReader; private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; - private List<ValueVector> primitiveVectors; private OperatorContext operatorContext; - // The interface for the parquet-mr library does not allow re-winding, to enable us to write into our - // fixed size value vectors, we must check how full the vectors are after some number of reads, for performance - // we avoid doing this every record. These values are populated with system/session settings to allow users to optimize - // for performance or allow a wider record size to be suported - private final int fillLevelCheckFrequency; - private final int fillLevelCheckThreshold; private FragmentContext fragmentContext; // For columns not found in the file, we need to return a schema element with the correct number of values @@ -102,8 +93,8 @@ public class DrillParquetReader extends AbstractRecordReader { // No actual data needs to be read out of the file, we only need to return batches until we have 'read' the number of // records specified in the row group metadata long mockRecordsRead=0; - private List<SchemaPath> columnsNotFound=null; - boolean noColumnsFound = false; // true if none of the columns in the projection list is found in the schema + private List<SchemaPath> columnsNotFound; + boolean noColumnsFound; // true if none of the columns in the projection list is found in the schema // See DRILL-4203 private final ParquetReaderUtility.DateCorruptionStatus containsCorruptedDates; @@ -116,8 +107,6 @@ public class DrillParquetReader extends AbstractRecordReader { this.entry = entry; setColumns(columns); this.fragmentContext = fragmentContext; - fillLevelCheckFrequency = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD).num_val.intValue(); - fillLevelCheckThreshold = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD).num_val.intValue(); } public static MessageType getProjection(MessageType schema, @@ -263,13 +252,13 @@ public class DrillParquetReader extends AbstractRecordReader { } } - if(!noColumnsFound) { + if (!noColumnsFound) { writer = new VectorContainerWriter(output); // Discard the columns not found in the schema when create DrillParquetRecordMaterializer, since they have been added to output already. + @SuppressWarnings("unchecked") final Collection<SchemaPath> columns = columnsNotFound == null || columnsNotFound.size() == 0 ? getColumns(): CollectionUtils.subtract(getColumns(), columnsNotFound); recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection, columns, fragmentContext.getOptions(), containsCorruptedDates); - primitiveVectors = writer.getMapVector().getPrimitiveVectors(); recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer); } } catch (Exception e) { @@ -331,22 +320,6 @@ public class DrillParquetReader extends AbstractRecordReader { return count; } - private int getPercentFilled() { - int filled = 0; - for (final ValueVector v : primitiveVectors) { - filled = Math.max(filled, v.getAccessor().getValueCount() * 100 / v.getValueCapacity()); - if (v instanceof VariableWidthVector) { - filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity()); - } - // TODO - need to re-enable this -// if (v instanceof RepeatedFixedWidthVector) { -// filled = Math.max(filled, ((RepeatedFixedWidthVector) v).getAccessor().getGroupCount() * 100) -// } - } - logger.debug("Percent filled: {}", filled); - return filled; - } - @Override public void close() { try { http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index 2d778bd..f6ffdbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -17,19 +17,17 @@ */ package org.apache.drill.exec.store.parquet2; +import java.util.Collection; + import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; - import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; -import java.util.Collection; -import java.util.List; - public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> { public DrillParquetGroupConverter root; http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java index ffe7b18..3ab85ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java @@ -32,10 +32,7 @@ public class InMemoryStoreProvider implements PersistentStoreProvider { } @Override - public void close() throws Exception { - // TODO Auto-generated method stub - - } + public void close() throws Exception { } @Override public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException { @@ -43,9 +40,5 @@ public class InMemoryStoreProvider implements PersistentStoreProvider { } @Override - public void start() throws Exception { - // TODO Auto-generated method stub - - } - + public void start() throws Exception { } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index a094f51..05ca6ea 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -78,6 +78,7 @@ import java.util.regex.Pattern; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.test.ClusterFixture; public class BaseTestQuery extends ExecTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); @@ -215,6 +216,7 @@ public class BaseTestQuery extends ExecTest { bits[i] = new Drillbit(config, serviceSet, classpathScan); bits[i].run(); + @SuppressWarnings("resource") final StoragePluginRegistry pluginRegistry = bits[i].getContext().getStorage(); TestUtilities.updateDfsTestTmpSchemaLocation(pluginRegistry, dfsTestTmpSchemaLocation); TestUtilities.makeDfsTmpSchemaImmutable(pluginRegistry); @@ -377,6 +379,25 @@ public class BaseTestQuery extends ExecTest { testNoResult(1, query, args); } + public static void alterSession(String option, Object value) { + String valueStr = ClusterFixture.stringify(value); + try { + test("ALTER SESSION SET `%s` = %s", option, valueStr); + } catch(final Exception e) { + fail(String.format("Failed to set session option `%s` = %s, Error: %s", + option, valueStr, e.toString())); + } + } + + public static void resetSessionOption(String option) { + try { + test("ALTER SESSION RESET `%s`", option); + } catch(final Exception e) { + fail(String.format("Failed to reset session option `%s`, Error: %s", + option, e.toString())); + } + } + protected static void testNoResult(int interation, String query, Object... args) throws Exception { query = String.format(query, args); logger.debug("Running query:\n--------------\n" + query); @@ -440,9 +461,9 @@ public class BaseTestQuery extends ExecTest { } catch (AssertionError e) { e.addSuppressed(actualException); throw e; + } } } - } /** * Utility method which tests given query produces a {@link UserException} @@ -479,23 +500,19 @@ public class BaseTestQuery extends ExecTest { } protected static void setSessionOption(final String option, final boolean value) { - setSessionOption(option, Boolean.toString(value)); + alterSession(option, value); } protected static void setSessionOption(final String option, final long value) { - setSessionOption(option, Long.toString(value)); + alterSession(option, value); } protected static void setSessionOption(final String option, final double value) { - setSessionOption(option, Double.toString(value)); + alterSession(option, value); } protected static void setSessionOption(final String option, final String value) { - try { - runSQL(String.format("alter session set `%s` = %s", option, value)); - } catch(final Exception e) { - fail(String.format("Failed to set session option `%s` = %s, Error: %s", option, value, e.toString())); - } + alterSession(option, value); } public static class SilentListener implements UserResultsListener { http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java index 2003857..2e81acd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; -import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.LogFixture; import org.apache.drill.test.ProfileParser; import org.apache.drill.test.QueryBuilder; @@ -94,7 +94,7 @@ public class TestHashAggrSpill { .logger("org.apache.drill", Level.WARN) ; - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem) .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions) .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches) http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java index 8b75f3a..b49a12e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestLimitWithExchanges.java @@ -74,8 +74,7 @@ public class TestLimitWithExchanges extends BaseTestQuery { final String[] expectedPlan5 = {"(?s)Limit\\(fetch=\\[1\\].*UnionExchange.*Limit\\(fetch=\\[1\\]\\).*Join"}; testLimitHelper(sql5, expectedPlan5, excludedPlan, 1); } finally { - final OperatorFixture.TestOptionSet testOptionSet = new OperatorFixture.TestOptionSet(); - test("alter session set `%s` = %s", ExecConstants.SLICE_TARGET, testOptionSet.getDefault(ExecConstants.SLICE_TARGET).getValue()); + resetSessionOption(ExecConstants.SLICE_TARGET); } } @@ -96,8 +95,7 @@ public class TestLimitWithExchanges extends BaseTestQuery { testLimitHelper(sql2, expectedPlan, excludedPlan2, 5); } finally { - final OperatorFixture.TestOptionSet testOptionSet = new OperatorFixture.TestOptionSet(); - test("alter session set `planner.slice_target` = " + testOptionSet.getDefault(ExecConstants.SLICE_TARGET).getValue()); + resetSessionOption(ExecConstants.SLICE_TARGET); } } @@ -126,8 +124,7 @@ public class TestLimitWithExchanges extends BaseTestQuery { testLimitHelper(sql3, expectedPlan2, excludedPlan2, 10); testLimitHelper(sql4, expectedPlan2, excludedPlan2, 10); } finally { - final OperatorFixture.TestOptionSet testOptionSet = new OperatorFixture.TestOptionSet(); - test("alter session set `planner.slice_target` = " + testOptionSet.getDefault(ExecConstants.SLICE_TARGET).getValue()); + resetSessionOption(ExecConstants.SLICE_TARGET); } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java index d4e33b0..4b3cbff 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/validate/TestValidationOptions.java @@ -24,7 +24,7 @@ import org.apache.drill.exec.store.easy.text.compliant.CompliantTextRecordReader import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.DrillTest; -import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.LogFixture; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -59,7 +59,7 @@ public class TestValidationOptions extends DrillTest { @Test public void testOptions() throws Exception { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .maxParallelization(1) .configProperty(ExecConstants.ENABLE_ITERATOR_VALIDATION, false) .configProperty(ExecConstants.ENABLE_VECTOR_VALIDATION, false) @@ -93,7 +93,7 @@ public class TestValidationOptions extends DrillTest { @Test public void testConfig() throws Exception { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .maxParallelization(1) .configProperty(ExecConstants.ENABLE_ITERATOR_VALIDATION, true) .configProperty(ExecConstants.ENABLE_VECTOR_VALIDATION, true) @@ -119,7 +119,7 @@ public class TestValidationOptions extends DrillTest { @Test public void testDefaults() throws Exception { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .maxParallelization(1) ; try (ClusterFixture cluster = builder.build(); http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 41be0ba..9715d14 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.physical.impl.writer; -import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY; import static org.apache.drill.TestBuilder.convertToLocalTimestamp; +import static org.apache.drill.exec.store.parquet.ParquetRecordWriter.DRILL_VERSION_PROPERTY; import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; import static org.junit.Assert.assertEquals; @@ -26,14 +26,13 @@ import java.io.File; import java.io.FileWriter; import java.math.BigDecimal; import java.sql.Date; -import java.util.Arrays; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.common.base.Joiner; import org.apache.drill.BaseTestQuery; import org.apache.drill.categories.ParquetTest; import org.apache.drill.categories.SlowTest; @@ -44,7 +43,6 @@ import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.fn.interp.TestConstantFolding; import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.test.OperatorFixture; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -63,10 +61,11 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import com.google.common.base.Joiner; + @RunWith(Parameterized.class) @Category({SlowTest.class, ParquetTest.class}) public class TestParquetWriter extends BaseTestQuery { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestParquetWriter.class); @Parameterized.Parameters public static Collection<Object[]> data() { @@ -126,12 +125,12 @@ public class TestParquetWriter extends BaseTestQuery { @BeforeClass public static void initFs() throws Exception { fs = getLocalFileSystem(); - test(String.format("alter session set `%s` = true", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); + alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true); } @AfterClass public static void disableDecimalDataType() throws Exception { - test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); + alterSession(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, false); } @Test @@ -188,25 +187,26 @@ public class TestParquetWriter extends BaseTestQuery { try { // read all of the types with the complex reader - test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true); runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); } finally { - test(String.format("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER)); + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); } } @Test public void testAllScalarTypesDictionary() throws Exception { try { - test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, true); /// read once with the flat reader runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); // read all of the types with the complex reader - test(String.format("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER)); + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true); runTestAndValidate(allTypesSelection, "*", allTypesTable, "donuts_json"); } finally { - test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); + resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); } } @@ -221,10 +221,10 @@ public class TestParquetWriter extends BaseTestQuery { String selection = "type"; String inputTable = "cp.`donuts.json`"; try { - test(String.format("alter session set %s = true", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, true); runTestAndValidate(selection, selection, inputTable, "donuts_json"); } finally { - test(String.format("alter session set %s = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); + resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); } } @@ -262,7 +262,7 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testTPCHReadWrite1_date_convertedType() throws Exception { try { - test("alter session set `%s` = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); + alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, false); String selection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + "L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, cast(L_COMMITDATE as DATE) as L_COMMITDATE, cast(L_RECEIPTDATE as DATE) AS L_RECEIPTDATE, L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT"; String validationSelection = "L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, " + @@ -270,9 +270,7 @@ public class TestParquetWriter extends BaseTestQuery { String inputTable = "cp.`tpch/lineitem.parquet`"; runTestAndValidate(selection, validationSelection, inputTable, "lineitem_parquet_converted"); } finally { - OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test("alter session set `%s` = %b", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, - optionSet.getDefault(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val); + resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); } } @@ -321,29 +319,24 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testTPCHReadWriteNoDictUncompressed() throws Exception { try { - test(String.format("alter session set `%s` = false", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING)); - test(String.format("alter session set `%s` = 'none'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); + alterSession(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, false); + alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "none"); String inputTable = "cp.`tpch/supplier.parquet`"; runTestAndValidate("*", "*", inputTable, "supplier_parquet_no_dict_uncompressed"); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test(String.format("alter session set `%s` = %b", ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING, - optionSet.getDefault(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING).bool_val)); - test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, - optionSet.getDefault(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val)); + resetSessionOption(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); + resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE); } } @Test public void testTPCHReadWriteDictGzip() throws Exception { try { - test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); + alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip"); String inputTable = "cp.`tpch/supplier.parquet`"; runTestAndValidate("*", "*", inputTable, "supplier_parquet_dict_gzip"); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, - optionSet.getDefault(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val)); + resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE); } } @@ -393,22 +386,33 @@ public class TestParquetWriter extends BaseTestQuery { "cast(salary as decimal(24,2)) as decimal24, cast(salary as decimal(38,2)) as decimal38"; String validateSelection = "decimal8, decimal15, decimal24, decimal38"; String inputTable = "cp.`employee.json`"; - runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal"); + + // DRILL-5833: The "old" writer had a decimal bug, but the new one + // did not. The one used was random. Force the test to run both + // the old and new readers. + + try { + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true); + runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal"); + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false); + runTestAndValidate(selection, validateSelection, inputTable, "parquet_decimal"); + } finally { + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); + } } @Test public void testMulipleRowGroups() throws Exception { try { - test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024)); + alterSession(ExecConstants.PARQUET_BLOCK_SIZE, 1024*1024); String selection = "mi"; String inputTable = "cp.`customer.json`"; runTestAndValidate(selection, selection, inputTable, "foodmart_customer_parquet"); } finally { - test(String.format("ALTER SESSION SET `%s` = %d", ExecConstants.PARQUET_BLOCK_SIZE, 512*1024*1024)); + resetSessionOption(ExecConstants.PARQUET_BLOCK_SIZE); } } - @Test public void testDate() throws Exception { String selection = "cast(hire_date as DATE) as hire_date"; @@ -465,9 +469,7 @@ public class TestParquetWriter extends BaseTestQuery { .optionSettingQueriesForBaseline("alter system set `store.parquet.use_new_reader` = true") .build().run(); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test("alter system set `%s` = %b", ExecConstants.PARQUET_NEW_RECORD_READER, - optionSet.getDefault(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val); + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); } } @@ -486,10 +488,7 @@ public class TestParquetWriter extends BaseTestQuery { "alter system set `store.parquet.use_new_reader` = true") .build().run(); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test("alter system set `%s` = %b", - ExecConstants.PARQUET_NEW_RECORD_READER, - optionSet.getDefault(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val); + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); } } @@ -730,7 +729,6 @@ public class TestParquetWriter extends BaseTestQuery { try { deleteTableIfExists(outputFile); test("use dfs_test.tmp"); - // test("ALTER SESSION SET `planner.add_producer_consumer` = false"); String query = String.format("SELECT %s FROM %s", selection, inputTable); String create = "CREATE TABLE " + outputFile + " AS " + query; String validateQuery = String.format("SELECT %s FROM " + outputFile, validationSelection); @@ -738,13 +736,16 @@ public class TestParquetWriter extends BaseTestQuery { testBuilder() .unOrdered() - .sqlQuery(query) - .sqlBaselineQuery(validateQuery) + // Validate query is the query on the output file (the one to validate) + .sqlQuery(validateQuery) + // The basline query is the query on the input file (the expected values) + .sqlBaselineQuery(query) .go(); Configuration hadoopConf = new Configuration(); hadoopConf.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); Path output = new Path(getDfsTestTmpSchemaLocation(), outputFile); + @SuppressWarnings("resource") FileSystem fs = output.getFileSystem(hadoopConf); for (FileStatus file : fs.listStatus(output)) { ParquetMetadata footer = ParquetFileReader.readFooter(hadoopConf, file, SKIP_ROW_GROUPS); @@ -765,10 +766,10 @@ public class TestParquetWriter extends BaseTestQuery { public void testImpalaParquetInt96() throws Exception { compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); try { - test("alter session set %s = true", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + alterSession(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP, true); compareParquetReadersColumnar("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); } finally { - test("alter session reset %s", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); } } @@ -799,7 +800,7 @@ public class TestParquetWriter extends BaseTestQuery { .baselineColumns("int96_ts") .build().run(); } finally { - test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); } } @@ -818,12 +819,12 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testImpalaParquetTimestampInt96AsTimeStamp() throws Exception { try { - test("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER); + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false); compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); - test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER); + alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, true); compareParquetInt96Converters("field_impala_ts", "cp.`parquet/int96_impala_1.parquet`"); } finally { - test("alter session reset `%s`", ExecConstants.PARQUET_NEW_RECORD_READER); + resetSessionOption(ExecConstants.PARQUET_NEW_RECORD_READER); } } @@ -939,7 +940,7 @@ public class TestParquetWriter extends BaseTestQuery { .build() .run(); } finally { - test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); } } @@ -958,24 +959,22 @@ public class TestParquetWriter extends BaseTestQuery { @Test public void testTPCHReadWriteGzip() throws Exception { try { - test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); + alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "gzip"); String inputTable = "cp.`tpch/supplier.parquet`"; runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip"); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, optionSet.getDefault(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val)); + resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE); } } @Test public void testTPCHReadWriteSnappy() throws Exception { try { - test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); + alterSession(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, "snappy"); String inputTable = "cp.`supplier_snappy.parquet`"; runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy"); } finally { - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, optionSet.getDefault(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).string_val)); + resetSessionOption(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE); } } @@ -997,7 +996,7 @@ public class TestParquetWriter extends BaseTestQuery { .build() .run(); } finally { - test("alter system reset `%s`", ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); + resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP); } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java index c9d4300..2170f41 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriterEmptyFiles.java @@ -97,8 +97,7 @@ public class TestParquetWriterEmptyFiles extends BaseTestQuery { .go(); } finally { // restore the session option - final OperatorFixture.TestOptionSet optionSet = new OperatorFixture.TestOptionSet(); - test("ALTER SESSION SET `store.parquet.block-size` = %d", optionSet.getDefault(ExecConstants.PARQUET_BLOCK_SIZE).num_val); + resetSessionOption(ExecConstants.PARQUET_BLOCK_SIZE); deleteTableIfExists(outputFile); } } http://git-wip-us.apache.org/repos/asf/drill/blob/42fc11e5/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java index d2d7b0c..aa0d4ee 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSimpleExternalSort.java @@ -34,7 +34,7 @@ import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.DrillTest; -import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.categories.SlowTest; import org.junit.Rule; import org.junit.Test; @@ -69,7 +69,7 @@ public class TestSimpleExternalSort extends DrillTest { */ private void mergeSortWithSv2(boolean testLegacy) throws Exception { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false) ; try (ClusterFixture cluster = builder.build(); @@ -96,7 +96,7 @@ public class TestSimpleExternalSort extends DrillTest { } private void sortOneKeyDescendingMergeSort(boolean testLegacy) throws Throwable { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .configProperty(ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED, false) ; try (ClusterFixture cluster = builder.build(); @@ -147,7 +147,7 @@ public class TestSimpleExternalSort extends DrillTest { } private void sortOneKeyDescendingExternalSort(boolean testLegacy) throws Throwable { - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, 4) .configProperty(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, 4) .configProperty(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT, 4) @@ -173,7 +173,7 @@ public class TestSimpleExternalSort extends DrillTest { } private void outOfMemoryExternalSort(boolean testLegacy) throws Throwable{ - FixtureBuilder builder = ClusterFixture.builder() + ClusterFixtureBuilder builder = ClusterFixture.builder() // Probably do nothing in modern Drill .configProperty("drill.memory.fragment.max", 50_000_000) .configProperty("drill.memory.fragment.initial", 2_000_000)