Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1045#discussion_r156193025 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java --- @@ -17,483 +17,216 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.FragmentOptionManager; -import org.apache.drill.exec.server.options.OptionList; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.store.PartitionExplorer; -import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.work.batch.IncomingBuffers; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.batch.IncomingBuffers; /** - * Contextual objects required for execution of a particular fragment. - * This is the implementation; use <tt>FragmentContextInterface</tt> - * in code to allow tests to use test-time implementations. + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with {@link FragmentContextImpl} becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * {@link FragmentContextImpl}. */ -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); - private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); - - private final DrillbitContext context; - private final UserClientConnection connection; // is null if this context is for non-root fragment - private final QueryContext queryContext; // is null if this context is for non-root fragment - private final FragmentStats stats; - private final BufferAllocator allocator; - private final PlanFragment fragment; - private final ContextInformation contextInformation; - private IncomingBuffers buffers; - private final OptionManager fragmentOptions; - private final BufferManager bufferManager; - private ExecutorState executorState; - private final ExecutionControls executionControls; - - private final SendingAccountor sendingAccountor = new SendingAccountor(); - private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - fail(e); - } - - @Override - public void interrupt(final InterruptedException e) { - if (shouldContinue()) { - logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); - fail(e); - } - } - }; - - private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - private final AccountingUserConnection accountingUserConnection; - /** Stores constants and their holders by type */ - private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - +public interface FragmentContext extends UdfUtilities, AutoCloseable { /** - * Create a FragmentContext instance for non-root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Wait for ack that all outgoing batches have been sent */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, - final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, null, funcRegistry); - } + void waitForSendComplete(); /** - * Create a FragmentContext instance for root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param queryContext QueryContext. - * @param connection UserClientConnection. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Returns the UDF registry. + * @return the UDF registry */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, - final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) - throws ExecutionSetupException { - super(funcRegistry); - this.context = dbContext; - this.queryContext = queryContext; - this.connection = connection; - this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); - this.fragment = fragment; - contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); - - logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); - logger.debug("Fragment max allocation: {}", fragment.getMemMax()); - - final OptionList list; - if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { - list = new OptionList(); - } else { - try { - list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); - } catch (final Exception e) { - throw new ExecutionSetupException("Failure while reading plan options.", e); - } - } - fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); - - executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint()); - - // Add the fragment context to the root allocator. - // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments - try { - allocator = context.getAllocator().newChildAllocator( - "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()), - fragment.getMemInitial(), - fragment.getMemMax()); - Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); - } catch (final OutOfMemoryException e) { - throw UserException.memoryError(e) - .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId()) - .build(logger); - } catch(final Throwable e) { - throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); - } - - stats = new FragmentStats(allocator, fragment.getAssignment()); - bufferManager = new BufferManagerImpl(this.allocator); - constantValueHolderCache = Maps.newHashMap(); - } + FunctionImplementationRegistry getFunctionRegistry(); /** - * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying - * the long list of test files. + * Returns a read-only version of the session options. + * @return the session options */ - public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, - FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, connection, funcRegistry); - } + OptionSet getOptionSet(); - public OptionManager getOptions() { - return fragmentOptions; - } + PhysicalPlanReader getPlanReader(); - @Override - public OptionSet getOptionSet() { - return fragmentOptions; - } + ClusterCoordinator getClusterCoordinator(); - public void setBuffers(final IncomingBuffers buffers) { - Preconditions.checkArgument(this.buffers == null, "Can only set buffers once."); - this.buffers = buffers; - } + AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint); - public void setExecutorState(final ExecutorState executorState) { - Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once."); - this.executorState = executorState; - } + AccountingUserConnection getUserDataTunnel(); - public void fail(final Throwable cause) { - executorState.fail(cause); - } + void setBuffers(final IncomingBuffers buffers); + + boolean isImpersonationEnabled(); /** - * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) - * will mean that the fragment should prematurely exit execution. Long running operations should check this every so - * often so that Drill is responsive to cancellation operations. + * Generates code for a class given a {@link ClassGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) * - * @return false if the action should terminate immediately, true if everything is okay. + * @param cg the class generator + * @return an instance of the generated class */ - @Override - public boolean shouldContinue() { - return executorState.shouldContinue(); - } - - @Override - public DrillbitContext getDrillbitContext() { - return context; - } + <T> T getImplementationClass(final ClassGenerator<T> cg) + throws ClassTransformationException, IOException; /** - * This method is only used to construt InfoSchemaReader, it is for the reader to get full schema, so here we - * are going to return a fully initialized schema tree. - * @return root schema's plus + * Generates code for a class given a {@link CodeGenerator}, + * and returns a single instance of the generated class. (Note + * that the name is a misnomer, it would be better called + * <tt>getImplementationInstance</tt>.) + * + * @param cg the code generator + * @return an instance of the generated class */ - public SchemaPlus getFullRootSchema() { - if (queryContext == null) { - fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + - "This is a non-root fragment.")); - return null; - } - - final boolean isImpersonationEnabled = isImpersonationEnabled(); - // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for - // InfoSchema purpose we want to show tables the user has permissions to list or query. If impersonation is - // disabled view the schema as Drillbit process user and throw authorization errors to client. - SchemaConfig schemaConfig = SchemaConfig - .newBuilder( - isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(), - queryContext) - .setIgnoreAuthErrors(isImpersonationEnabled) - .build(); - - return queryContext.getFullRootSchema(schemaConfig); - } + <T> T getImplementationClass(final CodeGenerator<T> cg) + throws ClassTransformationException, IOException; /** - * Get this node's identity. - * @return A DrillbitEndpoint object. + * Generates code for a class given a {@link ClassGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg the class generator + * @return list of instances of the generated class */ - public DrillbitEndpoint getIdentity() { - return context.getEndpoint(); - } - - public FragmentStats getStats() { - return stats; - } - - @Override - public ContextInformation getContextInformation() { - return contextInformation; - } - - public DrillbitEndpoint getForemanEndpoint() { - return fragment.getForeman(); - } + <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; /** - * The FragmentHandle for this Fragment - * @return FragmentHandle + * Generates code for a class given a {@link CodeGenerator}, and returns the + * specified number of instances of the generated class. (Note that the name + * is a misnomer, it would be better called + * <tt>getImplementationInstances</tt>.) + * + * @param cg the code generator + * @return list of instances of the generated class */ - public FragmentHandle getHandle() { - return fragment.getHandle(); - } - - public String getFragIdString() { - final FragmentHandle handle = getHandle(); - final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0"; - return frag; - } + <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int instanceCount) + throws ClassTransformationException, IOException; /** - * Get this fragment's allocator. - * @return the allocator + * Return the set of execution controls used to inject faults into running + * code for testing. + * + * @return the execution controls */ - @Deprecated - public BufferAllocator getAllocator() { - if (allocator == null) { - logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL"); - } - return allocator; - } + ExecutionControls getExecutionControls(); - public BufferAllocator getNewChildAllocator(final String operatorName, - final int operatorId, - final long initialReservation, - final long maximumReservation) throws OutOfMemoryException { - return allocator.newChildAllocator( - "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName, - initialReservation, - maximumReservation - ); - } + /** + * Returns the Drill configuration for this run. Note that the config is + * global and immutable. + * + * @return the Drill configuration + */ + DrillConfig getConfig(); - public boolean isOverMemoryLimit() { - return allocator.isOverLimit(); - } + FragmentStats getStats(); - @Override - protected CodeCompiler getCompiler() { - return context.getCompiler(); - } + CodeCompiler getCompiler(); - public AccountingUserConnection getUserDataTunnel() { - Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel"); - return accountingUserConnection; - } + Collection<CoordinationProtos.DrillbitEndpoint> getBits(); - public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) { - return context.getController().getTunnel(endpoint); - } + CoordinationProtos.DrillbitEndpoint getForemanEndpoint(); - public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) { - AccountingDataTunnel tunnel = tunnels.get(endpoint); - if (tunnel == null) { - tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler); - tunnels.put(endpoint, tunnel); - } - return tunnel; - } + CoordinationProtos.DrillbitEndpoint getEndpoint(); - public IncomingBuffers getBuffers() { - return buffers; - } + Controller getController(); - public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats) - throws OutOfMemoryException { - OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats); - contexts.add(context); - return context; - } + OperatorCreatorRegistry getOperatorCreatorRegistry(); --- End diff -- Needed only when creating operators, not when executing a fragment. So, network/cluster related. Rather than continue to tag each item; please review the original interface class where I did this analysis to decide which methods should be in the runtime interface and which are network/cluster/server related (and so I left them in the implementation class.) Since we have no design doc, perhaps we can at least document these decisions and concepts in the class header comments. (I suppose I should have done that; rereading the comment it clearly lacks this level of detail.)
---