DRILL-5842: Refactor fragment, operator contexts This closes #978
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c56de2f1 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c56de2f1 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c56de2f1 Branch: refs/heads/master Commit: c56de2f13a36d673f0c5836d44817137e64b91e4 Parents: 42fc11e Author: Paul Rogers <prog...@maprtech.com> Authored: Wed Oct 4 22:43:44 2017 -0700 Committer: Paul Rogers <prog...@maprtech.com> Committed: Mon Nov 13 14:55:39 2017 -0800 ---------------------------------------------------------------------- .../exec/ops/AbstractOperatorExecContext.java | 95 --------- .../drill/exec/ops/AccountingDataTunnel.java | 3 +- .../drill/exec/ops/BaseFragmentContext.java | 90 +++++++++ .../drill/exec/ops/BaseOperatorContext.java | 196 +++++++++++++++++++ .../apache/drill/exec/ops/FragmentContext.java | 81 +++----- .../exec/ops/FragmentContextInterface.java | 149 ++++++++++++++ .../drill/exec/ops/FragmentExecContext.java | 131 ------------- .../apache/drill/exec/ops/OperExecContext.java | 90 --------- .../drill/exec/ops/OperExecContextImpl.java | 146 -------------- .../apache/drill/exec/ops/OperatorContext.java | 97 ++++++++- .../drill/exec/ops/OperatorContextImpl.java | 76 ++----- .../drill/exec/ops/OperatorExecContext.java | 46 ----- .../drill/exec/ops/OperatorStatReceiver.java | 4 + .../apache/drill/exec/ops/OperatorStats.java | 2 + .../drill/exec/physical/impl/ScanBatch.java | 5 +- .../physical/impl/xsort/SingleBatchSorter.java | 4 +- .../impl/xsort/SingleBatchSorterTemplate.java | 6 +- .../impl/xsort/managed/BaseSortWrapper.java | 9 +- .../impl/xsort/managed/BaseWrapper.java | 8 +- .../impl/xsort/managed/BufferedBatches.java | 6 +- .../impl/xsort/managed/ExternalSortBatch.java | 10 +- .../impl/xsort/managed/MSortTemplate.java | 8 +- .../physical/impl/xsort/managed/MSorter.java | 4 +- .../impl/xsort/managed/MergeSortWrapper.java | 15 +- .../managed/PriorityQueueCopierWrapper.java | 10 +- .../physical/impl/xsort/managed/SortImpl.java | 10 +- .../impl/xsort/managed/SortMetrics.java | 1 + .../impl/xsort/managed/SorterWrapper.java | 10 +- .../impl/xsort/managed/SpilledRuns.java | 8 +- .../exec/store/dfs/DrillFSDataInputStream.java | 11 +- .../drill/exec/store/dfs/DrillFileSystem.java | 6 +- .../impl/xsort/managed/SortTestUtilities.java | 8 +- .../impl/xsort/managed/TestSortImpl.java | 8 +- .../physical/impl/xsort/managed/TestSorter.java | 6 +- .../physical/unit/BasicPhysicalOpUnitTest.java | 3 + .../physical/unit/MiniPlanUnitTestBase.java | 36 ++-- .../physical/unit/PhysicalOpUnitTestBase.java | 14 +- .../org/apache/drill/test/OperatorFixture.java | 124 +++++++----- 38 files changed, 768 insertions(+), 768 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java deleted file mode 100644 index ebef55c..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AbstractOperatorExecContext.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.ops; - -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.testing.ExecutionControls; - -import io.netty.buffer.DrillBuf; - -/** - * Implementation of {@link OperatorExecContext} that provides services - * needed by most run-time operators. Excludes services that need the - * entire Drillbit. Allows easy testing of operator code that uses this - * interface. - */ - -public class AbstractOperatorExecContext implements OperatorExecContext { - - protected final BufferAllocator allocator; - protected final ExecutionControls executionControls; - protected final PhysicalOperator popConfig; - protected final BufferManager manager; - protected OperatorStatReceiver statsWriter; - - public AbstractOperatorExecContext(BufferAllocator allocator, PhysicalOperator popConfig, - ExecutionControls executionControls, - OperatorStatReceiver stats) { - this.allocator = allocator; - this.popConfig = popConfig; - this.manager = new BufferManagerImpl(allocator); - statsWriter = stats; - - this.executionControls = executionControls; - } - - @Override - public DrillBuf replace(DrillBuf old, int newSize) { - return manager.replace(old, newSize); - } - - @Override - public DrillBuf getManagedBuffer() { - return manager.getManagedBuffer(); - } - - @Override - public DrillBuf getManagedBuffer(int size) { - return manager.getManagedBuffer(size); - } - - @Override - public ExecutionControls getExecutionControls() { - return executionControls; - } - - @Override - public BufferAllocator getAllocator() { - if (allocator == null) { - throw new UnsupportedOperationException("Operator context does not have an allocator"); - } - return allocator; - } - - @Override - public void close() { - try { - manager.close(); - } finally { - if (allocator != null) { - allocator.close(); - } - } - } - - @Override - public OperatorStatReceiver getStatsWriter() { - return statsWriter; - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java index 22923bb..44aa280 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -23,7 +23,6 @@ import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.testing.ExecutionControlsInjector; import org.slf4j.Logger; /** http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java new file mode 100644 index 0000000..a39213c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.exec.compile.CodeCompiler; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; + +import io.netty.buffer.DrillBuf; + +/** + * Common implementation for both the test and production versions + * of the fragment context. + */ + +public abstract class BaseFragmentContext implements FragmentContextInterface { + + private final FunctionImplementationRegistry funcRegistry; + + public BaseFragmentContext(final FunctionImplementationRegistry funcRegistry) { + this.funcRegistry = funcRegistry; + } + + @Override + public FunctionImplementationRegistry getFunctionRegistry() { + return funcRegistry; + } + + protected abstract CodeCompiler getCompiler(); + + @Override + public <T> T getImplementationClass(final ClassGenerator<T> cg) + throws ClassTransformationException, IOException { + return getImplementationClass(cg.getCodeGenerator()); + } + + @Override + public <T> T getImplementationClass(final CodeGenerator<T> cg) + throws ClassTransformationException, IOException { + return getCompiler().createInstance(cg); + } + + @Override + public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { + return getImplementationClass(cg.getCodeGenerator(), instanceCount); + } + + @Override + public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { + return getCompiler().createInstances(cg, instanceCount); + } + + protected abstract BufferManager getBufferManager(); + + @Override + public DrillBuf replace(final DrillBuf old, final int newSize) { + return getBufferManager().replace(old, newSize); + } + + @Override + public DrillBuf getManagedBuffer() { + return getBufferManager().getManagedBuffer(); + } + + @Override + public DrillBuf getManagedBuffer(final int size) { + return getBufferManager().getManagedBuffer(size); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java new file mode 100644 index 0000000..123f8fa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseOperatorContext.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import io.netty.buffer.DrillBuf; + +/** + * Implementation of {@link OperatorContext} that provides services + * needed by most run-time operators. Excludes services that need the + * entire Drillbit. This class provides services common to the test-time + * version of the operator context and the full production-time context + * that includes network services. + */ + +public abstract class BaseOperatorContext implements OperatorContext { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseOperatorContext.class); + + protected final FragmentContextInterface context; + protected final BufferAllocator allocator; + protected final PhysicalOperator popConfig; + protected final BufferManager manager; + private DrillFileSystem fs; + private ControlsInjector injector; + + public BaseOperatorContext(FragmentContextInterface context, BufferAllocator allocator, + PhysicalOperator popConfig) { + this.context = context; + this.allocator = allocator; + this.popConfig = popConfig; + this.manager = new BufferManagerImpl(allocator); + } + + @Override + public FragmentContextInterface getFragmentContext() { + return context; + } + + @SuppressWarnings("unchecked") + @Override + public <T extends PhysicalOperator> T getOperatorDefn() { + return (T) popConfig; + } + + public String getName() { + return popConfig.getClass().getName(); + } + + @Override + public DrillBuf replace(DrillBuf old, int newSize) { + return manager.replace(old, newSize); + } + + @Override + public DrillBuf getManagedBuffer() { + return manager.getManagedBuffer(); + } + + @Override + public DrillBuf getManagedBuffer(int size) { + return manager.getManagedBuffer(size); + } + + @Override + public ExecutionControls getExecutionControls() { + return context.getExecutionControls(); + } + + @Override + public BufferAllocator getAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException("Operator context does not have an allocator"); + } + return allocator; + } + + // Allow an operator to use the thread pool + @Override + public ExecutorService getExecutor() { + return context.getDrillbitContext().getExecutor(); + } + + @Override + public ExecutorService getScanExecutor() { + return context.getDrillbitContext().getScanExecutor(); + } + + @Override + public ExecutorService getScanDecodeExecutor() { + return context.getDrillbitContext().getScanDecodeExecutor(); + } + + @Override + public void setInjector(ControlsInjector injector) { + this.injector = injector; + } + + @Override + public ControlsInjector getInjector() { + return injector; + } + + @Override + public void injectUnchecked(String desc) { + ExecutionControls executionControls = context.getExecutionControls(); + if (injector != null && executionControls != null) { + injector.injectUnchecked(executionControls, desc); + } + } + + @Override + public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass) + throws T { + ExecutionControls executionControls = context.getExecutionControls(); + if (injector != null && executionControls != null) { + injector.injectChecked(executionControls, desc, exceptionClass); + } + } + + @Override + public void close() { + RuntimeException ex = null; + try { + manager.close(); + } catch (RuntimeException e) { + ex = e; + } + try { + if (allocator != null) { + allocator.close(); + } + } catch (RuntimeException e) { + ex = ex == null ? e : ex; + } + try { + if (fs != null) { + fs.close(); + fs = null; + } + } catch (IOException e) { + if (ex == null) { + ex = UserException + .resourceError(e) + .addContext("Failed to close the Drill file system for " + getName()) + .build(logger); + } + } + if (ex != null) { + throw ex; + } + } + + @Override + public DrillFileSystem newFileSystem(Configuration conf) throws IOException { + Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); + fs = new DrillFileSystem(conf, getStatsWriter()); + return fs; + } + + /** + * Creates a DrillFileSystem that does not automatically track operator stats. + */ + @Override + public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { + Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); + fs = new DrillFileSystem(conf, null); + return fs; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 19ffca2..1cde97a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,22 +17,18 @@ */ package org.apache.drill.exec.ops; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import io.netty.buffer.DrillBuf; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; + import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.memory.BufferAllocator; @@ -59,15 +55,21 @@ import org.apache.drill.exec.testing.ExecutionControls; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.batch.IncomingBuffers; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import io.netty.buffer.DrillBuf; /** * Contextual objects required for execution of a particular fragment. + * This is the implementation; use <tt>FragmentContextInterface</tt> + * in code to allow tests to use test-time implementations. */ -public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExecContext { + +public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); @@ -77,7 +79,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe private final UserClientConnection connection; // is null if this context is for non-root fragment private final QueryContext queryContext; // is null if this context is for non-root fragment private final FragmentStats stats; - private final FunctionImplementationRegistry funcRegistry; private final BufferAllocator allocator; private final PlanFragment fragment; private final ContextInformation contextInformation; @@ -87,7 +88,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe private ExecutorState executorState; private final ExecutionControls executionControls; - private final SendingAccountor sendingAccountor = new SendingAccountor(); private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { @Override @@ -135,12 +135,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { + super(funcRegistry); this.context = dbContext; this.queryContext = queryContext; this.connection = connection; this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); this.fragment = fragment; - this.funcRegistry = funcRegistry; contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); @@ -225,6 +225,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe return executorState.shouldContinue(); } + @Override public DrillbitContext getDrillbitContext() { return context; } @@ -313,25 +314,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe } @Override - public <T> T getImplementationClass(final ClassGenerator<T> cg) - throws ClassTransformationException, IOException { - return getImplementationClass(cg.getCodeGenerator()); - } - - @Override - public <T> T getImplementationClass(final CodeGenerator<T> cg) - throws ClassTransformationException, IOException { - return context.getCompiler().createInstance(cg); - } - - @Override - public <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { - return getImplementationClass(cg.getCodeGenerator(), instanceCount); - } - - @Override - public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) throws ClassTransformationException, IOException { - return context.getCompiler().createInstances(cg, instanceCount); + protected CodeCompiler getCompiler() { + return context.getCompiler(); } public AccountingUserConnection getUserDataTunnel() { @@ -383,11 +367,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe } @Override - public FunctionImplementationRegistry getFunctionRegistry() { - return funcRegistry; - } - - @Override public DrillConfig getConfig() { return context.getConfig(); } @@ -439,19 +418,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe } } - public DrillBuf replace(final DrillBuf old, final int newSize) { - return bufferManager.replace(old, newSize); - } - - @Override - public DrillBuf getManagedBuffer() { - return bufferManager.getManagedBuffer(); - } - - public DrillBuf getManagedBuffer(final int size) { - return bufferManager.getManagedBuffer(size); - } - @Override public PartitionExplorer getPartitionExplorer() { throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " + @@ -494,6 +460,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities, FragmentExe return buffers.isDone(); } + @Override + protected BufferManager getBufferManager() { + return bufferManager; + } + public interface ExecutorState { /** * Whether execution should continue. http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java new file mode 100644 index 0000000..7d4ba18 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextInterface.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.io.IOException; +import java.util.List; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionSet; +import org.apache.drill.exec.testing.ExecutionControls; + +import io.netty.buffer.DrillBuf; + +/** + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with <tt>FragmentContext</tt> becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * <tt>FragmentContext</tt>. + */ + +public interface FragmentContextInterface { + + /** + * Drillbit context. Valid only in production; returns null in + * operator test environments. + */ + + DrillbitContext getDrillbitContext(); + + /** + * Returns the UDF registry. + * @return the UDF registry + */ + FunctionImplementationRegistry getFunctionRegistry(); + /** + * Returns a read-only version of the session options. + * @return the session options + */ + OptionSet getOptionSet(); + + /** + * Generates code for a class given a {@link ClassGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) + * + * @param cg the class generator + * @return an instance of the generated class + */ + + <T> T getImplementationClass(final ClassGenerator<T> cg) + throws ClassTransformationException, IOException; + + /** + * Generates code for a class given a {@link CodeGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) + * + * @param cg the code generator + * @return an instance of the generated class + */ + + <T> T getImplementationClass(final CodeGenerator<T> cg) + throws ClassTransformationException, IOException; + + /** + * Generates code for a class given a {@link ClassGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg + * the class generator + * @return list of instances of the generated class + */ + + <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; + + /** + * Generates code for a class given a {@link CodeGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg + * the code generator + * @return list of instances of the generated class + */ + + <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; + + /** + * Determine if fragment execution has been interrupted. + * @return true if execution should continue, false if an interruption has + * occurred and fragment execution should halt + */ + + boolean shouldContinue(); + + /** + * Return the set of execution controls used to inject faults into running + * code for testing. + * + * @return the execution controls + */ + ExecutionControls getExecutionControls(); + + /** + * Returns the Drill configuration for this run. Note that the config is + * global and immutable. + * + * @return the Drill configuration + */ + + DrillConfig getConfig(); + + DrillBuf replace(DrillBuf old, int newSize); + + DrillBuf getManagedBuffer(); + + DrillBuf getManagedBuffer(int size); +} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java deleted file mode 100644 index 526c030..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentExecContext.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.ops; - -import java.io.IOException; -import java.util.List; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.testing.ExecutionControls; - -/** - * Services passed to fragments that deal only with execution details - * such as the function registry, options, code generation and the like. - * Does not include top-level services such as network endpoints. Code - * written to use this interface can be unit tested quite easily using - * the {@link OperatorContext} class. Code that uses the wider, - * more global {@link FragmentContext} must be tested in the context - * of the entire Drill server, or using mocks for the global services. - */ - -public interface FragmentExecContext { - /** - * Returns the UDF registry. - * @return the UDF registry - */ - FunctionImplementationRegistry getFunctionRegistry(); - /** - * Returns a read-only version of the session options. - * @return the session options - */ - OptionSet getOptionSet(); - - /** - * Generates code for a class given a {@link ClassGenerator}, - * and returns a single instance of the generated class. (Note - * that the name is a misnomer, it would be better called - * <tt>getImplementationInstance</tt>.) - * - * @param cg the class generator - * @return an instance of the generated class - */ - - <T> T getImplementationClass(final ClassGenerator<T> cg) - throws ClassTransformationException, IOException; - - /** - * Generates code for a class given a {@link CodeGenerator}, - * and returns a single instance of the generated class. (Note - * that the name is a misnomer, it would be better called - * <tt>getImplementationInstance</tt>.) - * - * @param cg the code generator - * @return an instance of the generated class - */ - - <T> T getImplementationClass(final CodeGenerator<T> cg) - throws ClassTransformationException, IOException; - - /** - * Generates code for a class given a {@link ClassGenerator}, and returns the - * specified number of instances of the generated class. (Note that the name - * is a misnomer, it would be better called - * <tt>getImplementationInstances</tt>.) - * - * @param cg - * the class generator - * @return list of instances of the generated class - */ - - <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) - throws ClassTransformationException, IOException; - - /** - * Generates code for a class given a {@link CodeGenerator}, and returns the - * specified number of instances of the generated class. (Note that the name - * is a misnomer, it would be better called - * <tt>getImplementationInstances</tt>.) - * - * @param cg - * the code generator - * @return list of instances of the generated class - */ - - <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) - throws ClassTransformationException, IOException; - - /** - * Determine if fragment execution has been interrupted. - * @return true if execution should continue, false if an interruption has - * occurred and fragment execution should halt - */ - - boolean shouldContinue(); - - /** - * Return the set of execution controls used to inject faults into running - * code for testing. - * - * @return the execution controls - */ - ExecutionControls getExecutionControls(); - - /** - * Returns the Drill configuration for this run. Note that the config is - * global and immutable. - * - * @return the Drill configuration - */ - - DrillConfig getConfig(); -} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java deleted file mode 100644 index 89f3b63..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContext.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.ops; - -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.testing.ControlsInjector; - -/** - * Defines the set of services used by operator implementations. This - * is a subset of the full {@link OperatorContext} which removes global - * services such as network endpoints. Code written to this interface - * can be easily unit tested using the {@link OperatorFixture} class. - * Code that needs global services must be tested in the Drill server - * as a whole, or using mocks for global services. - */ - -public interface OperExecContext extends FragmentExecContext { - - /** - * Return the physical operator definition created by the planner and passed - * into the Drillbit executing the query. - * @return the physical operator definition - */ - - <T extends PhysicalOperator> T getOperatorDefn(); - - /** - * Return the memory allocator for this operator. - * - * @return the per-operator memory allocator - */ - - BufferAllocator getAllocator(); - - /** - * A write-only interface to the Drill statistics mechanism. Allows - * operators to update statistics. - * @return operator statistics - */ - - OperatorStatReceiver getStats(); - - /** - * Returns the fault injection mechanism used to introduce faults at runtime - * for testing. - * @return the fault injector - */ - - ControlsInjector getInjector(); - - /** - * Insert an unchecked fault (exception). Handles the details of checking if - * fault injection is enabled and this particular fault is selected. - * @param desc the description of the fault used to match a fault - * injection parameter to determine if the fault should be injected - * @throws RuntimeException an unchecked exception if the fault is enabled - */ - - void injectUnchecked(String desc); - - /** - * Insert a checked fault (exception) of the given class. Handles the details - * of checking if fault injection is enabled and this particular fault is - * selected. - * - * @param desc the description of the fault used to match a fault - * injection parameter to determine if the fault should be injected - * @param exceptionClass the class of exeception to be thrown - * @throws T if the fault is enabled - */ - - <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass) - throws T; -} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java deleted file mode 100644 index b625e76..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperExecContextImpl.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.ops; - -import java.io.IOException; -import java.util.List; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.exception.ClassTransformationException; -import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.testing.ControlsInjector; -import org.apache.drill.exec.testing.ExecutionControls; - -/** - * Implementation of the context used by low-level operator - * tasks. - */ - -public class OperExecContextImpl implements OperExecContext { - - private FragmentExecContext fragmentContext; - private PhysicalOperator operDefn; - private ControlsInjector injector; - private BufferAllocator allocator; - private OperatorStatReceiver stats; - - public OperExecContextImpl(FragmentExecContext fragContext, OperatorContext opContext, PhysicalOperator opDefn, ControlsInjector injector) { - this(fragContext, opContext.getAllocator(), opContext.getStats(), opDefn, injector); - } - - public OperExecContextImpl(FragmentExecContext fragContext, BufferAllocator allocator, OperatorStatReceiver stats, PhysicalOperator opDefn, ControlsInjector injector) { - this.fragmentContext = fragContext; - this.operDefn = opDefn; - this.injector = injector; - this.allocator = allocator; - this.stats = stats; - } - - @Override - public FunctionImplementationRegistry getFunctionRegistry() { - return fragmentContext.getFunctionRegistry(); - } - - @Override - public OptionSet getOptionSet() { - return fragmentContext.getOptionSet(); - } - - @Override - public <T> T getImplementationClass(ClassGenerator<T> cg) - throws ClassTransformationException, IOException { - return fragmentContext.getImplementationClass(cg); - } - - @Override - public <T> T getImplementationClass(CodeGenerator<T> cg) - throws ClassTransformationException, IOException { - return fragmentContext.getImplementationClass(cg); - } - - @Override - public <T> List<T> getImplementationClass(ClassGenerator<T> cg, - int instanceCount) throws ClassTransformationException, IOException { - return fragmentContext.getImplementationClass(cg, instanceCount); - } - - @Override - public <T> List<T> getImplementationClass(CodeGenerator<T> cg, - int instanceCount) throws ClassTransformationException, IOException { - return fragmentContext.getImplementationClass(cg, instanceCount); - } - - @Override - public boolean shouldContinue() { - return fragmentContext.shouldContinue(); - } - - @Override - public ExecutionControls getExecutionControls() { - return fragmentContext.getExecutionControls(); - } - - @Override - public BufferAllocator getAllocator() { - return allocator; - } - - @Override - public OperatorStatReceiver getStats() { - return stats; - } - - @SuppressWarnings("unchecked") - @Override - public <T extends PhysicalOperator> T getOperatorDefn() { - return (T) operDefn; - } - - @Override - public DrillConfig getConfig() { - return fragmentContext.getConfig(); - } - - @Override - public ControlsInjector getInjector() { - return injector; - } - - @Override - public void injectUnchecked(String desc) { - ExecutionControls executionControls = fragmentContext.getExecutionControls(); - if (injector != null && executionControls != null) { - injector.injectUnchecked(executionControls, desc); - } - } - - @Override - public <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass) - throws T { - ExecutionControls executionControls = fragmentContext.getExecutionControls(); - if (injector != null && executionControls != null) { - injector.injectChecked(executionControls, desc, exceptionClass); - } - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index b248d5f..37653e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -21,13 +21,70 @@ import java.io.IOException; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.testing.ControlsInjector; +import org.apache.drill.exec.testing.ExecutionControls; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import com.google.common.util.concurrent.ListenableFuture; -public interface OperatorContext extends OperatorExecContext { +import io.netty.buffer.DrillBuf; + +/** + * Per-operator services available for operator implementations. + * The services allow access to the operator definition, to the + * fragment context, and to per-operator services. + * <p> + * Use this interface in code to allow unit tests to provide + * test-time implementations of this context. + */ + +public interface OperatorContext { + + /** + * Return the physical operator definition created by the planner and passed + * into the Drillbit executing the query. + * @return the physical operator definition + */ + + <T extends PhysicalOperator> T getOperatorDefn(); + + /** + * Return the memory allocator for this operator. + * + * @return the per-operator memory allocator + */ + + BufferAllocator getAllocator(); + + FragmentContextInterface getFragmentContext(); + + DrillBuf replace(DrillBuf old, int newSize); + + DrillBuf getManagedBuffer(); + + DrillBuf getManagedBuffer(int size); + + ExecutionControls getExecutionControls(); + + /** + * A write-only interface to the Drill statistics mechanism. Allows + * operators to update statistics. + * @return operator statistics + */ + + OperatorStatReceiver getStatsWriter(); + + /** + * Full operator stats (for legacy code). Prefer + * <tt>getStatsWriter()</tt> to allow code to easily run in a + * test environment. + * + * @return operator statistics + */ OperatorStats getStats(); @@ -51,4 +108,40 @@ public interface OperatorContext extends OperatorExecContext { */ <RESULT> ListenableFuture<RESULT> runCallableAs(UserGroupInformation proxyUgi, Callable<RESULT> callable); - } + + void setInjector(ControlsInjector injector); + + /** + * Returns the fault injection mechanism used to introduce faults at runtime + * for testing. + * @return the fault injector + */ + + ControlsInjector getInjector(); + + /** + * Insert an unchecked fault (exception). Handles the details of checking if + * fault injection is enabled and this particular fault is selected. + * @param desc the description of the fault used to match a fault + * injection parameter to determine if the fault should be injected + * @throws RuntimeException an unchecked exception if the fault is enabled + */ + + void injectUnchecked(String desc); + + /** + * Insert a checked fault (exception) of the given class. Handles the details + * of checking if fault injection is enabled and this particular fault is + * selected. + * + * @param desc the description of the fault used to match a fault + * injection parameter to determine if the fault should be injected + * @param exceptionClass the class of exeception to be thrown + * @throws T if the fault is enabled + */ + + <T extends Throwable> void injectChecked(String desc, Class<T> exceptionClass) + throws T; + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index 37c609e..bc85c39 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -17,33 +17,23 @@ */ package org.apache.drill.exec.ops; -import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.work.WorkManager; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -class OperatorContextImpl extends AbstractOperatorExecContext implements OperatorContext, AutoCloseable { +class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class); private boolean closed = false; private final OperatorStats stats; - private DrillFileSystem fs; - private final ExecutorService executor; - private final ExecutorService scanExecutor; - private final ExecutorService scanDecodeExecutor; /** * This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There @@ -59,9 +49,10 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) throws OutOfMemoryException { - super(context.getNewChildAllocator(popConfig.getClass().getSimpleName(), - popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()), - popConfig, context.getExecutionControls(), stats); + super(context, + context.getNewChildAllocator(popConfig.getClass().getSimpleName(), + popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation()), + popConfig); if (stats != null) { this.stats = stats; } else { @@ -70,25 +61,6 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato OperatorUtilities.getChildCount(popConfig)); this.stats = context.getStats().newOperatorStats(def, allocator); } - executor = context.getDrillbitContext().getExecutor(); - scanExecutor = context.getDrillbitContext().getScanExecutor(); - scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor(); - } - - // Allow an operator to use the thread pool - @Override - public ExecutorService getExecutor() { - return executor; - } - - @Override - public ExecutorService getScanExecutor() { - return scanExecutor; - } - - @Override - public ExecutorService getScanDecodeExecutor() { - return scanDecodeExecutor; } public boolean isClosed() { @@ -98,23 +70,15 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato @Override public void close() { if (closed) { - logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null); + logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? getName() : null); return; } - logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); + logger.debug("Closing context for {}", popConfig != null ? getName() : null); - closed = true; try { super.close(); } finally { - if (fs != null) { - try { - fs.close(); - fs = null; - } catch (IOException e) { - throw new DrillRuntimeException(e); - } - } + closed = true; } } @@ -124,11 +88,16 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato } @Override + public OperatorStatReceiver getStatsWriter() { + return stats; + } + + @Override public <RESULT> ListenableFuture<RESULT> runCallableAs(final UserGroupInformation proxyUgi, final Callable<RESULT> callable) { synchronized (this) { if (delegatePool == null) { - delegatePool = MoreExecutors.listeningDecorator(executor); + delegatePool = MoreExecutors.listeningDecorator(getExecutor()); } } return delegatePool.submit(new Callable<RESULT>() { @@ -152,21 +121,4 @@ class OperatorContextImpl extends AbstractOperatorExecContext implements Operato } }); } - - @Override - public DrillFileSystem newFileSystem(Configuration conf) throws IOException { - Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); - fs = new DrillFileSystem(conf, getStats()); - return fs; - } - - /** - * Creates a DrillFileSystem that does not automatically track operator stats. - */ - @Override - public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { - Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); - fs = new DrillFileSystem(conf, null); - return fs; - } } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java deleted file mode 100644 index 4d64aba..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorExecContext.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.ops; - -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.testing.ExecutionControls; - -import io.netty.buffer.DrillBuf; - -/** - * Narrowed version of the {@link OperatorContext} used to create an - * easy-to-test version of the operator context that excludes services - * that require a full Drillbit server. - */ - -public interface OperatorExecContext { - - DrillBuf replace(DrillBuf old, int newSize); - - DrillBuf getManagedBuffer(); - - DrillBuf getManagedBuffer(int size); - - BufferAllocator getAllocator(); - - ExecutionControls getExecutionControls(); - - OperatorStatReceiver getStatsWriter(); - - void close(); -} http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java index 6aa8d76..4dba2c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStatReceiver.java @@ -67,4 +67,8 @@ public interface OperatorStatReceiver { */ void setDoubleStat(MetricDef metric, double value); + + void startWait(); + + void stopWait(); } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index b3c9ff9..1b96f28 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -169,6 +169,7 @@ public class OperatorStats implements OperatorStatReceiver { inProcessing = false; } + @Override public synchronized void startWait() { assert !inWait : assertionError("starting waiting"); stopProcessing(); @@ -176,6 +177,7 @@ public class OperatorStats implements OperatorStatReceiver { waitMark = System.nanoTime(); } + @Override public synchronized void stopWait() { assert inWait : assertionError("stopping waiting"); startProcessing(); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 6c2c171..77e9ea4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorExecContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -319,9 +318,9 @@ public class ScanBatch implements CloseableRecordBatch { private final VectorContainer container; - private final OperatorExecContext oContext; + private final OperatorContext oContext; - public Mutator(OperatorExecContext oContext, BufferAllocator allocator, VectorContainer container) { + public Mutator(OperatorContext oContext, BufferAllocator allocator, VectorContainer container) { this.oContext = oContext; this.allocator = allocator; this.container = container; http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index ccaca98..733ea5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentExecContext; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; public interface SingleBatchSorter { - public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; + public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; public void sort(SelectionVector2 vector2) throws SchemaChangeException; public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION = http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index 672dd2b..0f4680d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentExecContext; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In private SelectionVector2 vector2; @Override - public void setup(FragmentExecContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ + public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ Preconditions.checkNotNull(vector2); this.vector2 = vector2; try { @@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In } } - public abstract void doSetup(@Named("context") FragmentExecContext context, + public abstract void doSetup(@Named("context") FragmentContextInterface context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java index 1f381b9..338462e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseSortWrapper.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.record.VectorAccessible; @@ -45,7 +45,7 @@ public abstract class BaseSortWrapper extends BaseWrapper { protected static final MappingSet LEFT_MAPPING = new MappingSet("leftIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); protected static final MappingSet RIGHT_MAPPING = new MappingSet("rightIndex", null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); - public BaseSortWrapper(OperExecContext opContext) { + public BaseSortWrapper(OperatorContext opContext) { super(opContext); } @@ -56,7 +56,8 @@ public abstract class BaseSortWrapper extends BaseWrapper { for (Ordering od : popConfig.getOrderings()) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, context.getFunctionRegistry()); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, + context.getFragmentContext().getFunctionRegistry()); if (collector.hasErrors()) { throw UserException.unsupportedError() .message("Failure while materializing expression. " + collector.toErrorString()) @@ -71,7 +72,7 @@ public abstract class BaseSortWrapper extends BaseWrapper { // next we wrap the two comparison sides and add the expression block for the comparison. LogicalExpression fh = FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, - context.getFunctionRegistry()); + context.getFragmentContext().getFunctionRegistry()); HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java index e607f40..0287059 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BaseWrapper.java @@ -22,7 +22,7 @@ import java.io.IOException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; /** * Base class for code-generation-based tasks. @@ -30,15 +30,15 @@ import org.apache.drill.exec.ops.OperExecContext; public abstract class BaseWrapper { - protected OperExecContext context; + protected OperatorContext context; - public BaseWrapper(OperExecContext context) { + public BaseWrapper(OperatorContext context) { this.context = context; } protected <T> T getInstance(CodeGenerator<T> cg, org.slf4j.Logger logger) { try { - return context.getImplementationClass(cg); + return context.getFragmentContext().getImplementationClass(cg); } catch (ClassTransformationException e) { throw UserException.unsupportedError(e) .message("Code generation error - likely code error.") http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java index c930877..f26f6b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BufferedBatches.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; import org.apache.drill.exec.record.BatchSchema; @@ -55,9 +55,9 @@ public class BufferedBatches { private BatchSchema schema; - private final OperExecContext context; + private final OperatorContext context; - public BufferedBatches(OperExecContext opContext) { + public BufferedBatches(OperatorContext opContext) { context = opContext; sorterWrapper = new SorterWrapper(opContext); } http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 6a97c29..2054c9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -20,8 +20,6 @@ package org.apache.drill.exec.physical.impl.xsort.managed; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperExecContext; -import org.apache.drill.exec.ops.OperExecContextImpl; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.spill.SpillSet; import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator; @@ -218,10 +216,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { SortConfig sortConfig = new SortConfig(context.getConfig()); SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig); - OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector); - PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext); - SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder); - sortImpl = new SortImpl(opContext, sortConfig, spilledRuns, container); + oContext.setInjector(injector); + PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(oContext); + SpilledRuns spilledRuns = new SpilledRuns(oContext, spillSet, copierHolder); + sortImpl = new SortImpl(oContext, sortConfig, spilledRuns, container); // The upstream operator checks on record count before we have // results. Create an empty result set temporarily to handle http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java index 5b07c4a..698e32f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java @@ -24,7 +24,7 @@ import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentExecContext; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { */ private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue(); - private FragmentExecContext context; + private FragmentContextInterface context; /** * Controls the maximum size of batches exposed to downstream @@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { private int desiredRecordBatchCount; @Override - public void setup(final FragmentExecContext context, final BufferAllocator allocator, final SelectionVector4 vector4, + public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); @@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { } } - public abstract void doSetup(@Named("context") FragmentExecContext context, + public abstract void doSetup(@Named("context") FragmentContextInterface context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java index 71ae29e..428f6f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentExecContext; +import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; */ public interface MSorter { - public void setup(FragmentExecContext context, BufferAllocator allocator, SelectionVector4 vector4, + public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException; public void sort(); public SelectionVector4 getSV4(); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java index 01135f0..f592e44 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MergeSortWrapper.java @@ -32,7 +32,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; @@ -87,7 +87,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { private State state = State.FIRST; private final VectorContainer destContainer; - public MergeSortWrapper(OperExecContext opContext, VectorContainer destContainer) { + public MergeSortWrapper(OperatorContext opContext, VectorContainer destContainer) { super(opContext); this.destContainer = destContainer; } @@ -123,7 +123,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { sv4 = builder.getSv4(); Sort popConfig = context.getOperatorDefn(); mSorter = createNewMSorter(popConfig.getOrderings(), MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); - mSorter.setup(context, context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize); + mSorter.setup(context.getFragmentContext(), context.getAllocator(), sv4, destContainer, sv4.getCount(), outputBatchSize); } catch (SchemaChangeException e) { throw UserException.unsupportedError(e) .message("Unexpected schema change - likely code error.") @@ -142,7 +142,9 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { } private MSorter createNewMSorter(List<Ordering> orderings, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) { - CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet()); + CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, + context.getFragmentContext().getFunctionRegistry(), + context.getFragmentContext().getOptionSet()); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. @@ -153,7 +155,8 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { for (Ordering od : orderings) { // first, we rewrite the evaluation stack for each side of the comparison. ErrorCollector collector = new ErrorCollectorImpl(); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, context.getFunctionRegistry()); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), destContainer, collector, + context.getFragmentContext().getFunctionRegistry()); if (collector.hasErrors()) { throw UserException.unsupportedError() .message("Failure while materializing expression. " + collector.toErrorString()) @@ -168,7 +171,7 @@ public class MergeSortWrapper extends BaseSortWrapper implements SortResults { // next we wrap the two comparison sides and add the expression block for the comparison. LogicalExpression fh = FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, - context.getFunctionRegistry()); + context.getFragmentContext().getFunctionRegistry()); HoldingContainer out = g.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE); JConditional jc = g.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0))); http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java index 88686e5..ab8cc9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java @@ -30,14 +30,14 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -66,7 +66,7 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper { private PriorityQueueCopier copier; - public PriorityQueueCopierWrapper(OperExecContext opContext) { + public PriorityQueueCopierWrapper(OperatorContext opContext) { super(opContext); } @@ -80,7 +80,9 @@ public class PriorityQueueCopierWrapper extends BaseSortWrapper { private PriorityQueueCopier newCopier(VectorAccessible batch) { // Generate the copier code and obtain the resulting class - CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet()); + CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, + context.getFragmentContext().getFunctionRegistry(), + context.getFragmentContext().getOptionSet()); ClassGenerator<PriorityQueueCopier> g = cg.getRoot(); cg.plainJavaCapable(true); // Uncomment out this line to debug the generated code. http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java index d2b589c..2d53c3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java @@ -21,17 +21,17 @@ import java.io.IOException; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.OperExecContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; import org.apache.drill.exec.physical.impl.xsort.MSortTemplate; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessibleUtilities; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorInitializer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -174,7 +174,7 @@ public class SortImpl { private final SortMetrics metrics; private final SortMemoryManager memManager; private VectorContainer outputBatch; - private OperExecContext context; + private OperatorContext context; /** * Memory allocator for this operator itself. Incoming batches are @@ -192,7 +192,7 @@ public class SortImpl { private VectorInitializer allocHelper; - public SortImpl(OperExecContext opContext, SortConfig sortConfig, + public SortImpl(OperatorContext opContext, SortConfig sortConfig, SpilledRuns spilledRuns, VectorContainer batch) { this.context = opContext; outputBatch = batch; @@ -200,7 +200,7 @@ public class SortImpl { allocator = opContext.getAllocator(); config = sortConfig; memManager = new SortMemoryManager(config, allocator.getLimit()); - metrics = new SortMetrics(opContext.getStats()); + metrics = new SortMetrics(opContext.getStatsWriter()); bufferedBatches = new BufferedBatches(opContext); // Request leniency from the allocator. Leniency http://git-wip-us.apache.org/repos/asf/drill/blob/c56de2f1/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java index 1233de8..8d20cca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java @@ -44,6 +44,7 @@ public class SortMetrics { private long writeBytes; public SortMetrics(OperatorStatReceiver stats) { + assert stats != null; this.stats = stats; }