Repository: drill Updated Branches: refs/heads/master 42fc11e53 -> c56de2f13
http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java index 0f27884..e7a78ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java @@ -21,7 +21,7 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; import org.apache.drill.exec.record.VectorAccessible; @@ -47,7 +47,7 @@ public class SorterWrapper extends BaseSortWrapper { private SingleBatchSorter sorter; - public SorterWrapper(OperExecContext opContext) { + public SorterWrapper(OperatorContext opContext) { super(opContext); } @@ -55,7 +55,7 @@ public class SorterWrapper extends BaseSortWrapper { SingleBatchSorter sorter = getSorter(convertedBatch); try { - sorter.setup(context, sv2, convertedBatch); + sorter.setup(context.getFragmentContext(), sv2, convertedBatch); sorter.sort(sv2); } catch (SchemaChangeException e) { convertedBatch.clear(); @@ -78,8 +78,8 @@ public class SorterWrapper extends BaseSortWrapper { private SingleBatchSorter newSorter(VectorAccessible batch) { CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get( - SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), - context.getOptionSet()); + SingleBatchSorter.TEMPLATE_DEFINITION, context.getFragmentContext().getFunctionRegistry(), + context.getFragmentContext().getOptionSet()); ClassGenerator<SingleBatchSorter> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java index b75ce77..3d7e63a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java @@ -23,13 +23,13 @@ import java.util.List; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun; import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults; import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorInitializer; import com.google.common.collect.Lists; @@ -56,9 +56,9 @@ public class SpilledRuns { private final PriorityQueueCopierWrapper copierHolder; private BatchSchema schema; - private final OperExecContext context; + private final OperatorContext context; - public SpilledRuns(OperExecContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper copier) { + public SpilledRuns(OperatorContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper copier) { this.context = opContext; this.spillSet = spillSet; // copierHolder = new PriorityQueueCopierWrapper(opContext); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java index 5e7be8d..489e03c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.dfs; +import org.apache.drill.exec.ops.OperatorStatReceiver; import org.apache.drill.exec.ops.OperatorStats; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.fs.FSDataInputStream; @@ -38,13 +39,13 @@ import java.util.EnumSet; public class DrillFSDataInputStream extends FSDataInputStream { private final FSDataInputStream underlyingIs; private final OpenFileTracker openFileTracker; - private final OperatorStats operatorStats; + private final OperatorStatReceiver operatorStats; - public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException { + public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats) throws IOException { this(in, operatorStats, null); } - public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats, + public DrillFSDataInputStream(FSDataInputStream in, OperatorStatReceiver operatorStats, OpenFileTracker openFileTracker) throws IOException { super(new WrappedInputStream(in, operatorStats)); underlyingIs = in; @@ -193,9 +194,9 @@ public class DrillFSDataInputStream extends FSDataInputStream { */ private static class WrappedInputStream extends InputStream implements Seekable, PositionedReadable { final FSDataInputStream is; - final OperatorStats operatorStats; + final OperatorStatReceiver operatorStats; - WrappedInputStream(FSDataInputStream is, OperatorStats operatorStats) { + WrappedInputStream(FSDataInputStream is, OperatorStatReceiver operatorStats) { this.is = is; this.operatorStats = operatorStats; } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java index 52e1a96..fc540aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java @@ -26,7 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.OperatorStatReceiver; import org.apache.drill.exec.util.AssertionUtil; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -80,14 +80,14 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker { private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap(); private final FileSystem underlyingFs; - private final OperatorStats operatorStats; + private final OperatorStatReceiver operatorStats; private final CompressionCodecFactory codecFactory; public DrillFileSystem(Configuration fsConf) throws IOException { this(fsConf, null); } - public DrillFileSystem(Configuration fsConf, OperatorStats operatorStats) throws IOException { + public DrillFileSystem(Configuration fsConf, OperatorStatReceiver operatorStats) throws IOException { this.underlyingFs = FileSystem.get(fsConf); this.codecFactory = new CompressionCodecFactory(fsConf); this.operatorStats = operatorStats; http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java index 4c90769..c58abd6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/SortTestUtilities.java @@ -27,19 +27,19 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.xsort.managed.PriorityQueueCopierWrapper.BatchMerger; -import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.test.OperatorFixture; import org.apache.drill.test.rowSet.DirectRowSet; import org.apache.drill.test.rowSet.RowSet; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import org.apache.drill.test.rowSet.RowSetComparison; import org.apache.drill.test.rowSet.RowSetSchema; import org.apache.drill.test.rowSet.SchemaBuilder; -import org.apache.drill.test.rowSet.RowSet.SingleRowSet; import com.google.common.collect.Lists; @@ -67,7 +67,7 @@ public class SortTestUtilities { FieldReference expr = FieldReference.getWithQuotedRef("key"); Ordering ordering = new Ordering(sortOrder, expr, nullOrder); Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false); - OperExecContext opContext = fixture.newOperExecContext(popConfig); + OperatorContext opContext = fixture.operatorContext(popConfig); return new PriorityQueueCopierWrapper(opContext); } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java index a8d6e82..d83a765 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java @@ -30,7 +30,7 @@ import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults; @@ -82,7 +82,7 @@ public class TestSortImpl extends DrillTest { FieldReference expr = FieldReference.getWithQuotedRef("key"); Ordering ordering = new Ordering(sortOrder, expr, nullOrder); Sort popConfig = new Sort(null, Lists.newArrayList(ordering), false); - OperExecContext opContext = fixture.newOperExecContext(popConfig); + OperatorContext opContext = fixture.operatorContext(popConfig); QueryId queryId = QueryId.newBuilder() .setPart1(1234) .setPart2(5678) @@ -92,9 +92,9 @@ public class TestSortImpl extends DrillTest { .setMinorFragmentId(3) .setQueryId(queryId) .build(); - SortConfig sortConfig = new SortConfig(opContext.getConfig()); + SortConfig sortConfig = new SortConfig(opContext.getFragmentContext().getConfig()); - SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, popConfig); + SpillSet spillSet = new SpillSet(opContext.getFragmentContext().getConfig(), handle, popConfig); PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext); SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder); return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java index 7d0019d..5f04da6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSorter.java @@ -29,7 +29,7 @@ import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.test.DrillTest; @@ -82,7 +82,7 @@ public class TestSorter extends DrillTest { } public void runSorterTest(Sort popConfig, SingleRowSet rowSet, SingleRowSet expected) throws Exception { - OperExecContext opContext = fixture.newOperExecContext(popConfig); + OperatorContext opContext = fixture.operatorContext(popConfig); SorterWrapper sorter = new SorterWrapper(opContext); sorter.sortBatch(rowSet.container(), rowSet.getSv2()); @@ -149,7 +149,7 @@ public class TestSorter extends DrillTest { Sort popConfig = makeSortConfig("key", sortOrder, nullOrder); this.nullable = nullable; - OperExecContext opContext = fixture.newOperExecContext(popConfig); + OperatorContext opContext = fixture.operatorContext(popConfig); sorter = new SorterWrapper(opContext); } } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java index 66588b1..62c961d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -75,6 +75,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { .go(); } + @SuppressWarnings("unchecked") @Test public void testSimpleHashJoin() { HashJoinPOP joinConf = new HashJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT); @@ -100,6 +101,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { .go(); } + @SuppressWarnings("unchecked") @Test public void testSimpleMergeJoin() { MergeJoinPOP joinConf = new MergeJoinPOP(null, null, Lists.newArrayList(joinCond("x", "EQUALS", "x1")), JoinRelType.LEFT); @@ -293,6 +295,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase { // TODO(DRILL-4439) - doesn't expect incoming batches, uses instead RawFragmentBatch // need to figure out how to mock these + @SuppressWarnings("unchecked") @Ignore @Test public void testSimpleMergingReceiver() { http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index c53536a..ca226e6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -158,7 +158,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { return; // successful } } - Map<String, List<Object>> actualSuperVectors = new TreeMap(); + Map<String, List<Object>> actualSuperVectors = new TreeMap<String, List<Object>>(); int actualBatchNum = DrillTestWrapper.addToCombinedVectorResults(batchIterator, expectSchema, actualSuperVectors); if (expectBatchNum != null) { @@ -234,7 +234,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { */ public class PopBuilder { - private PhysicalOperator popConfig; + protected PhysicalOperator popConfig; protected long initReservation = INIT_ALLOCATION; protected long maxAllocation = MAX_ALLOCATION; @@ -307,21 +307,24 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { return this; } + @SuppressWarnings("resource") public PopBuilder buildAddAsInput() throws Exception { - mockOpContext(initReservation, maxAllocation); + mockOpContext(popConfig, initReservation, maxAllocation); + @SuppressWarnings("unchecked") BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass()); RecordBatch batch= opCreator.getBatch(fragContext, popConfig, inputs); return parent.addInput(batch); } public RecordBatch build() throws Exception { - mockOpContext(initReservation, maxAllocation); + mockOpContext(popConfig, initReservation, maxAllocation); + @SuppressWarnings("unchecked") BatchCreator<PhysicalOperator> opCreator = (BatchCreator<PhysicalOperator>) getOpCreatorReg().getOperatorCreator(popConfig.getClass()); return opCreator.getBatch(fragContext, popConfig, inputs); } } - public abstract class ScanPopBuider<T extends ScanPopBuider> extends PopBuilder { + public abstract class ScanPopBuider<T extends ScanPopBuider<?>> extends PopBuilder { List<SchemaPath> columnsToRead = Collections.singletonList(SchemaPath.getSimplePath("*")); DrillFileSystem fs = null; @@ -333,16 +336,19 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { super(parent); } + @SuppressWarnings("unchecked") public T fileSystem(DrillFileSystem fs) { this.fs = fs; return (T) this; } + @SuppressWarnings("unchecked") public T columnsToRead(SchemaPath ... columnsToRead) { this.columnsToRead = Lists.newArrayList(columnsToRead); return (T) this; } + @SuppressWarnings("unchecked") public T columnsToRead(String ... columnsToRead) { this.columnsToRead = Lists.newArrayList(); @@ -360,7 +366,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { */ public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> { List<String> jsonBatches = null; - List<String> inputPaths = Collections.EMPTY_LIST; + List<String> inputPaths = Collections.emptyList(); public JsonScanBuilder(PopBuilder parent) { super(parent); @@ -380,14 +386,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { return this; } + @Override public PopBuilder buildAddAsInput() throws Exception { - mockOpContext(this.initReservation, this.maxAllocation); + mockOpContext(popConfig, this.initReservation, this.maxAllocation); RecordBatch scanBatch = getScanBatch(); return parent.addInput(scanBatch); } + @Override public RecordBatch build() throws Exception { - mockOpContext(this.initReservation, this.maxAllocation); + mockOpContext(popConfig, this.initReservation, this.maxAllocation); return getScanBatch(); } @@ -414,7 +422,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { * Builder for parquet Scan RecordBatch. */ public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> { - List<String> inputPaths = Collections.EMPTY_LIST; + List<String> inputPaths = Collections.emptyList(); public ParquetScanBuilder() { super(); @@ -429,14 +437,16 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { return this; } + @Override public PopBuilder buildAddAsInput() throws Exception { - mockOpContext(this.initReservation, this.maxAllocation); + mockOpContext(popConfig, this.initReservation, this.maxAllocation); RecordBatch scanBatch = getScanBatch(); return parent.addInput(scanBatch); } + @Override public RecordBatch build() throws Exception { - mockOpContext(this.initReservation, this.maxAllocation); + mockOpContext(popConfig, this.initReservation, this.maxAllocation); return getScanBatch(); } @@ -465,8 +475,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { } // end of ParquetScanBuilder @Override - protected void mockOpContext(long initReservation, long maxAllocation) throws Exception { - super.mockOpContext(initReservation, maxAllocation); + protected void mockOpContext(PhysicalOperator popConfig, long initReservation, long maxAllocation) throws Exception { + super.mockOpContext(popConfig, initReservation, maxAllocation); // mock ScanExecutor used by parquet reader. new NonStrictExpectations() { http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java index 157f1d7..26345c9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/PhysicalOpUnitTestBase.java @@ -78,8 +78,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import static org.apache.drill.exec.physical.base.AbstractBase.INIT_ALLOCATION; - /** * Look! Doesn't extend BaseTestQuery!! */ @@ -195,11 +193,12 @@ public class PhysicalOpUnitTestBase extends ExecTest { private long initReservation = AbstractBase.INIT_ALLOCATION; private long maxAllocation = AbstractBase.MAX_ALLOCATION; + @SuppressWarnings({ "unchecked", "resource" }) public void go() { BatchCreator<PhysicalOperator> opCreator; RecordBatch testOperator; try { - mockOpContext(initReservation, maxAllocation); + mockOpContext(popConfig, initReservation, maxAllocation); opCreator = (BatchCreator<PhysicalOperator>) opCreatorReg.getOperatorCreator(popConfig.getClass()); @@ -297,12 +296,14 @@ public class PhysicalOpUnitTestBase extends ExecTest { // optManager.getOption(withAny(new TypeValidators.StringValidator("", "try"))); result = "try"; // optManager.getOption(withAny(new TypeValidators.PositiveLongValidator("", 1l, 1l))); result = 10; fragContext.getOptions(); result = optionManager; + fragContext.getOptionSet(); result = optionManager; fragContext.getManagedBuffer(); result = bufManager.getManagedBuffer(); fragContext.shouldContinue(); result = true; fragContext.getExecutionControls(); result = executionControls; fragContext.getFunctionRegistry(); result = funcReg; fragContext.getConfig(); result = drillConf; fragContext.getHandle(); result = ExecProtos.FragmentHandle.getDefaultInstance(); + fragContext.getFunctionRegistry(); result = funcReg; try { CodeGenerator<?> cg = CodeGenerator.get(templateClassDefinition, funcReg); cg.plainJavaCapable(true); @@ -332,13 +333,16 @@ public class PhysicalOpUnitTestBase extends ExecTest { }; } - protected void mockOpContext(long initReservation, long maxAllocation) throws Exception{ + protected void mockOpContext(final PhysicalOperator popConfig, long initReservation, long maxAllocation) throws Exception{ final BufferAllocator allocator = this.allocator.newChildAllocator("allocator_for_operator_test", initReservation, maxAllocation); new NonStrictExpectations() { { opContext.getStats();result = opStats; + opContext.getStatsWriter(); result = opStats; opContext.getAllocator(); result = allocator; - fragContext.newOperatorContext(withAny(popConf));result = opContext; + opContext.getFragmentContext(); result = fragContext; + opContext.getOperatorDefn(); result = popConfig; + fragContext.newOperatorContext(withAny(popConf)); result = opContext; } }; } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index 09abf12..c03f0b7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -17,32 +17,33 @@ */ package org.apache.drill.test; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; -import org.apache.drill.exec.ops.AbstractOperatorExecContext; -import org.apache.drill.exec.ops.FragmentExecContext; +import org.apache.drill.exec.ops.BaseFragmentContext; +import org.apache.drill.exec.ops.BaseOperatorContext; +import org.apache.drill.exec.ops.BufferManager; +import org.apache.drill.exec.ops.BufferManagerImpl; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperExecContext; -import org.apache.drill.exec.ops.OperExecContextImpl; -import org.apache.drill.exec.ops.OperatorExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStatReceiver; +import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.testing.ExecutionControls; @@ -53,6 +54,9 @@ import org.apache.drill.test.rowSet.IndirectRowSet; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet; import org.apache.drill.test.rowSet.RowSetBuilder; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.util.concurrent.ListenableFuture; /** * Test fixture for operator and (especially) "sub-operator" tests. @@ -120,29 +124,30 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { * provide test-specific versions of various other services. */ - public static class TestCodeGenContext implements FragmentExecContext { + public static class TestFragmentContext extends BaseFragmentContext { private final DrillConfig config; private final OptionSet options; private final CodeCompiler compiler; - private final FunctionImplementationRegistry functionRegistry; private ExecutionControls controls; + private final BufferManagerImpl bufferManager; - public TestCodeGenContext(DrillConfig config, OptionSet options) { + public TestFragmentContext(DrillConfig config, OptionSet options, BufferAllocator allocator) { + super(newFunctionRegistry(config, options)); this.config = config; this.options = options; - ScanResult classpathScan = ClassPathScanner.fromPrescan(config); - functionRegistry = new FunctionImplementationRegistry(config, classpathScan, options); compiler = new CodeCompiler(config, options); + bufferManager = new BufferManagerImpl(allocator); } - public void setExecutionControls(ExecutionControls controls) { - this.controls = controls; + private static FunctionImplementationRegistry newFunctionRegistry( + DrillConfig config, OptionSet options) { + ScanResult classpathScan = ClassPathScanner.fromPrescan(config); + return new FunctionImplementationRegistry(config, classpathScan, options); } - @Override - public FunctionImplementationRegistry getFunctionRegistry() { - return functionRegistry; + public void setExecutionControls(ExecutionControls controls) { + this.controls = controls; } @Override @@ -151,40 +156,33 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { } @Override - public <T> T getImplementationClass(final ClassGenerator<T> cg) - throws ClassTransformationException, IOException { - return getImplementationClass(cg.getCodeGenerator()); + public boolean shouldContinue() { + return true; } @Override - public <T> T getImplementationClass(final CodeGenerator<T> cg) - throws ClassTransformationException, IOException { - return compiler.createInstance(cg); + public ExecutionControls getExecutionControls() { + return controls; } @Override - public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { - return getImplementationClass(cg.getCodeGenerator(), instanceCount); + public DrillConfig getConfig() { + return config; } @Override - public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { - return compiler.createInstances(cg, instanceCount); + public DrillbitContext getDrillbitContext() { + throw new UnsupportedOperationException("Drillbit context not available for operator unit tests"); } @Override - public boolean shouldContinue() { - return true; + protected CodeCompiler getCompiler() { + return compiler; } @Override - public ExecutionControls getExecutionControls() { - return controls; - } - - @Override - public DrillConfig getConfig() { - return config; + protected BufferManager getBufferManager() { + return bufferManager; } } @@ -234,10 +232,17 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { private void setStat(int metricId, double value) { stats.put(metricId, value); } + + // Timing stats not supported for test. + @Override + public void startWait() { } + + @Override + public void stopWait() { } } private final SystemOptionManager options; - private final TestCodeGenContext context; + private final TestFragmentContext context; private final OperatorStatReceiver stats; protected OperatorFixture(OperatorFixtureBuilder builder) { @@ -252,7 +257,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { if (builder.systemOptions != null) { applySystemOptions(builder.systemOptions); } - context = new TestCodeGenContext(config, options); + context = new TestFragmentContext(config, options, allocator); stats = new MockStats(); } @@ -263,7 +268,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { } public SystemOptionManager options() { return options; } - public FragmentExecContext fragmentExecContext() { return context; } + public FragmentContextInterface fragmentExecContext() { return context; } @Override public void close() throws Exception { @@ -284,10 +289,6 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { return builder().build(); } - public OperExecContext newOperExecContext(PhysicalOperator opDefn) { - return new OperExecContextImpl(context, allocator, stats, opDefn, null); - } - public RowSetBuilder rowSetBuilder(BatchSchema schema) { return new RowSetBuilder(allocator, schema); } @@ -309,7 +310,36 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { } } - public OperatorExecContext operatorContext(PhysicalOperator config) { - return new AbstractOperatorExecContext(allocator(), config, context.getExecutionControls(), stats); + public static class TestOperatorContext extends BaseOperatorContext { + + private final OperatorStatReceiver stats; + + public TestOperatorContext(FragmentContextInterface fragContext, + BufferAllocator allocator, + PhysicalOperator config, + OperatorStatReceiver stats) { + super(fragContext, allocator, config); + this.stats = stats; + } + + @Override + public OperatorStatReceiver getStatsWriter() { + return stats; + } + + @Override + public OperatorStats getStats() { + throw new UnsupportedOperationException("getStats() not supported for tests"); + } + + @Override + public <RESULT> ListenableFuture<RESULT> runCallableAs( + UserGroupInformation proxyUgi, Callable<RESULT> callable) { + throw new UnsupportedOperationException("Not yet"); + } + } + + public OperatorContext operatorContext(PhysicalOperator config) { + return new TestOperatorContext(context, allocator(), config, stats); } }