Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1045#discussion_r156490496 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java --- @@ -17,483 +17,216 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.FragmentOptionManager; -import org.apache.drill.exec.server.options.OptionList; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.store.PartitionExplorer; -import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.work.batch.IncomingBuffers; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.batch.IncomingBuffers; /** - * Contextual objects required for execution of a particular fragment. - * This is the implementation; use <tt>FragmentContextInterface</tt> - * in code to allow tests to use test-time implementations. + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with {@link FragmentContextImpl} becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * {@link FragmentContextImpl}. */ -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); - private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); - - private final DrillbitContext context; - private final UserClientConnection connection; // is null if this context is for non-root fragment - private final QueryContext queryContext; // is null if this context is for non-root fragment - private final FragmentStats stats; - private final BufferAllocator allocator; - private final PlanFragment fragment; - private final ContextInformation contextInformation; - private IncomingBuffers buffers; - private final OptionManager fragmentOptions; - private final BufferManager bufferManager; - private ExecutorState executorState; - private final ExecutionControls executionControls; - - private final SendingAccountor sendingAccountor = new SendingAccountor(); - private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - fail(e); - } - - @Override - public void interrupt(final InterruptedException e) { - if (shouldContinue()) { - logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); - fail(e); - } - } - }; - - private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - private final AccountingUserConnection accountingUserConnection; - /** Stores constants and their holders by type */ - private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - +public interface FragmentContext extends UdfUtilities, AutoCloseable { /** - * Create a FragmentContext instance for non-root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Wait for ack that all outgoing batches have been sent */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, - final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, null, funcRegistry); - } + void waitForSendComplete(); /** - * Create a FragmentContext instance for root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param queryContext QueryContext. - * @param connection UserClientConnection. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Returns the UDF registry. + * @return the UDF registry */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, - final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) - throws ExecutionSetupException { - super(funcRegistry); - this.context = dbContext; - this.queryContext = queryContext; - this.connection = connection; - this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); - this.fragment = fragment; - contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); - - logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); - logger.debug("Fragment max allocation: {}", fragment.getMemMax()); - - final OptionList list; - if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { - list = new OptionList(); - } else { - try { - list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); - } catch (final Exception e) { - throw new ExecutionSetupException("Failure while reading plan options.", e); - } - } - fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); - - executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint()); - - // Add the fragment context to the root allocator. - // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments - try { - allocator = context.getAllocator().newChildAllocator( - "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()), - fragment.getMemInitial(), - fragment.getMemMax()); - Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); - } catch (final OutOfMemoryException e) { - throw UserException.memoryError(e) - .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId()) - .build(logger); - } catch(final Throwable e) { - throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); - } - - stats = new FragmentStats(allocator, fragment.getAssignment()); - bufferManager = new BufferManagerImpl(this.allocator); - constantValueHolderCache = Maps.newHashMap(); - } + FunctionImplementationRegistry getFunctionRegistry(); --- End diff -- This is required since it's used by operators like join, hashagg, filter, and flatten. I've replaced FunctionImplementationRegistry with the FunctionLookupContext interface now. So it has no dependence on ZK or networking.
---