Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156490496
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements 
AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = 
Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this 
context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is 
for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new 
Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for 
the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new 
StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final 
FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new 
AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), 
fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", 
fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) 
{
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = 
dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), 
OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan 
options.", e);
    -      }
    -    }
    -    fragmentOptions = new 
FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, 
dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all 
the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + 
getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory 
allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
    --- End diff --
    
    This is required since it's used by operators like join, hashagg, filter, 
and flatten. I've replaced FunctionImplementationRegistry with the 
FunctionLookupContext interface now. So it has no dependence on ZK or 
networking. 


---

Reply via email to