Github user ilooner commented on a diff in the pull request:
https://github.com/apache/drill/pull/1045#discussion_r156538123
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
@@ -17,483 +17,216 @@
*/
package org.apache.drill.exec.ops;
+import java.io.IOException;
+import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.CodeCompiler;
-import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
-import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.planner.physical.PlannerSettings;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.UserClientConnection;
-import org.apache.drill.exec.rpc.control.ControlTunnel;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.FragmentOptionManager;
-import org.apache.drill.exec.server.options.OptionList;
-import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.options.OptionSet;
-import org.apache.drill.exec.store.PartitionExplorer;
-import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.work.batch.IncomingBuffers;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
/**
- * Contextual objects required for execution of a particular fragment.
- * This is the implementation; use <tt>FragmentContextInterface</tt>
- * in code to allow tests to use test-time implementations.
+ * Fragment context interface: separates implementation from definition.
+ * Allows unit testing by mocking or reimplementing services with
+ * test-time versions. The name is awkward, chosen to avoid renaming
+ * the implementation class which is used in many places in legacy code.
+ * New code should use this interface, and the names should eventually
+ * be swapped with {@link FragmentContextImpl} becoming
+ * <tt>FragmentContextImpl</tt> and this interface becoming
+ * {@link FragmentContextImpl}.
*/
-public class FragmentContext extends BaseFragmentContext implements
AutoCloseable, UdfUtilities, FragmentContextInterface {
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
-
- private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels =
Maps.newHashMap();
- private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
-
- private final DrillbitContext context;
- private final UserClientConnection connection; // is null if this
context is for non-root fragment
- private final QueryContext queryContext; // is null if this context is
for non-root fragment
- private final FragmentStats stats;
- private final BufferAllocator allocator;
- private final PlanFragment fragment;
- private final ContextInformation contextInformation;
- private IncomingBuffers buffers;
- private final OptionManager fragmentOptions;
- private final BufferManager bufferManager;
- private ExecutorState executorState;
- private final ExecutionControls executionControls;
-
- private final SendingAccountor sendingAccountor = new SendingAccountor();
- private final Consumer<RpcException> exceptionConsumer = new
Consumer<RpcException>() {
- @Override
- public void accept(final RpcException e) {
- fail(e);
- }
-
- @Override
- public void interrupt(final InterruptedException e) {
- if (shouldContinue()) {
- logger.error("Received an unexpected interrupt while waiting for
the data send to complete.", e);
- fail(e);
- }
- }
- };
-
- private final RpcOutcomeListener<Ack> statusHandler = new
StatusHandler(exceptionConsumer, sendingAccountor);
- private final AccountingUserConnection accountingUserConnection;
- /** Stores constants and their holders by type */
- private final Map<String, Map<MinorType, ValueHolder>>
constantValueHolderCache;
-
+public interface FragmentContext extends UdfUtilities, AutoCloseable {
/**
- * Create a FragmentContext instance for non-root fragment.
- *
- * @param dbContext DrillbitContext.
- * @param fragment Fragment implementation.
- * @param funcRegistry FunctionImplementationRegistry.
- * @throws ExecutionSetupException
+ * Wait for ack that all outgoing batches have been sent
*/
- public FragmentContext(final DrillbitContext dbContext, final
PlanFragment fragment,
- final FunctionImplementationRegistry funcRegistry) throws
ExecutionSetupException {
- this(dbContext, fragment, null, null, funcRegistry);
- }
+ void waitForSendComplete();
/**
- * Create a FragmentContext instance for root fragment.
- *
- * @param dbContext DrillbitContext.
- * @param fragment Fragment implementation.
- * @param queryContext QueryContext.
- * @param connection UserClientConnection.
- * @param funcRegistry FunctionImplementationRegistry.
- * @throws ExecutionSetupException
+ * Returns the UDF registry.
+ * @return the UDF registry
*/
- public FragmentContext(final DrillbitContext dbContext, final
PlanFragment fragment, final QueryContext queryContext,
- final UserClientConnection connection, final
FunctionImplementationRegistry funcRegistry)
- throws ExecutionSetupException {
- super(funcRegistry);
- this.context = dbContext;
- this.queryContext = queryContext;
- this.connection = connection;
- this.accountingUserConnection = new
AccountingUserConnection(connection, sendingAccountor, statusHandler);
- this.fragment = fragment;
- contextInformation = new ContextInformation(fragment.getCredentials(),
fragment.getContext());
-
- logger.debug("Getting initial memory allocation of {}",
fragment.getMemInitial());
- logger.debug("Fragment max allocation: {}", fragment.getMemMax());
-
- final OptionList list;
- if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty())
{
- list = new OptionList();
- } else {
- try {
- list =
dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(),
OptionList.class);
- } catch (final Exception e) {
- throw new ExecutionSetupException("Failure while reading plan
options.", e);
- }
- }
- fragmentOptions = new
FragmentOptionManager(context.getOptionManager(), list);
-
- executionControls = new ExecutionControls(fragmentOptions,
dbContext.getEndpoint());
-
- // Add the fragment context to the root allocator.
- // The QueryManager will call the root allocator to recalculate all
the memory limits for all the fragments
- try {
- allocator = context.getAllocator().newChildAllocator(
- "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
- fragment.getMemInitial(),
- fragment.getMemMax());
- Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
- } catch (final OutOfMemoryException e) {
- throw UserException.memoryError(e)
- .addContext("Fragment", getHandle().getMajorFragmentId() + ":" +
getHandle().getMinorFragmentId())
- .build(logger);
- } catch(final Throwable e) {
- throw new ExecutionSetupException("Failure while getting memory
allocator for fragment.", e);
- }
-
- stats = new FragmentStats(allocator, fragment.getAssignment());
- bufferManager = new BufferManagerImpl(this.allocator);
- constantValueHolderCache = Maps.newHashMap();
- }
+ FunctionImplementationRegistry getFunctionRegistry();
/**
- * TODO: Remove this constructor when removing the SimpleRootExec
(DRILL-2097). This is kept only to avoid modifying
- * the long list of test files.
+ * Returns a read-only version of the session options.
+ * @return the session options
*/
- public FragmentContext(DrillbitContext dbContext, PlanFragment fragment,
UserClientConnection connection,
- FunctionImplementationRegistry funcRegistry) throws
ExecutionSetupException {
- this(dbContext, fragment, null, connection, funcRegistry);
- }
+ OptionSet getOptionSet();
- public OptionManager getOptions() {
- return fragmentOptions;
- }
+ PhysicalPlanReader getPlanReader();
- @Override
- public OptionSet getOptionSet() {
- return fragmentOptions;
- }
+ ClusterCoordinator getClusterCoordinator();
- public void setBuffers(final IncomingBuffers buffers) {
- Preconditions.checkArgument(this.buffers == null, "Can only set
buffers once.");
- this.buffers = buffers;
- }
+ AccountingDataTunnel getDataTunnel(final
CoordinationProtos.DrillbitEndpoint endpoint);
- public void setExecutorState(final ExecutorState executorState) {
- Preconditions.checkArgument(this.executorState == null, "ExecutorState
can only be set once.");
- this.executorState = executorState;
- }
+ AccountingUserConnection getUserDataTunnel();
- public void fail(final Throwable cause) {
- executorState.fail(cause);
- }
+ void setBuffers(final IncomingBuffers buffers);
+
+ boolean isImpersonationEnabled();
/**
- * Tells individual operations whether they should continue. In some
cases, an external event (typically cancellation)
- * will mean that the fragment should prematurely exit execution. Long
running operations should check this every so
- * often so that Drill is responsive to cancellation operations.
+ * Generates code for a class given a {@link ClassGenerator},
+ * and returns a single instance of the generated class. (Note
+ * that the name is a misnomer, it would be better called
+ * <tt>getImplementationInstance</tt>.)
*
- * @return false if the action should terminate immediately, true if
everything is okay.
+ * @param cg the class generator
+ * @return an instance of the generated class
*/
- @Override
- public boolean shouldContinue() {
- return executorState.shouldContinue();
- }
-
- @Override
- public DrillbitContext getDrillbitContext() {
- return context;
- }
+ <T> T getImplementationClass(final ClassGenerator<T> cg)
+ throws ClassTransformationException, IOException;
/**
- * This method is only used to construt InfoSchemaReader, it is for the
reader to get full schema, so here we
- * are going to return a fully initialized schema tree.
- * @return root schema's plus
+ * Generates code for a class given a {@link CodeGenerator},
+ * and returns a single instance of the generated class. (Note
+ * that the name is a misnomer, it would be better called
+ * <tt>getImplementationInstance</tt>.)
+ *
+ * @param cg the code generator
+ * @return an instance of the generated class
*/
- public SchemaPlus getFullRootSchema() {
- if (queryContext == null) {
- fail(new UnsupportedOperationException("Schema tree can only be
created in root fragment. " +
- "This is a non-root fragment."));
- return null;
- }
-
- final boolean isImpersonationEnabled = isImpersonationEnabled();
- // If impersonation is enabled, we want to view the schema as query
user and suppress authorization errors. As for
- // InfoSchema purpose we want to show tables the user has permissions
to list or query. If impersonation is
- // disabled view the schema as Drillbit process user and throw
authorization errors to client.
- SchemaConfig schemaConfig = SchemaConfig
- .newBuilder(
- isImpersonationEnabled ? queryContext.getQueryUserName() :
ImpersonationUtil.getProcessUserName(),
- queryContext)
- .setIgnoreAuthErrors(isImpersonationEnabled)
- .build();
-
- return queryContext.getFullRootSchema(schemaConfig);
- }
+ <T> T getImplementationClass(final CodeGenerator<T> cg)
+ throws ClassTransformationException, IOException;
/**
- * Get this node's identity.
- * @return A DrillbitEndpoint object.
+ * Generates code for a class given a {@link ClassGenerator}, and
returns the
+ * specified number of instances of the generated class. (Note that the
name
+ * is a misnomer, it would be better called
+ * <tt>getImplementationInstances</tt>.)
+ *
+ * @param cg the class generator
+ * @return list of instances of the generated class
*/
- public DrillbitEndpoint getIdentity() {
- return context.getEndpoint();
- }
-
- public FragmentStats getStats() {
- return stats;
- }
-
- @Override
- public ContextInformation getContextInformation() {
- return contextInformation;
- }
-
- public DrillbitEndpoint getForemanEndpoint() {
- return fragment.getForeman();
- }
+ <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int
instanceCount)
+ throws ClassTransformationException, IOException;
/**
- * The FragmentHandle for this Fragment
- * @return FragmentHandle
+ * Generates code for a class given a {@link CodeGenerator}, and returns
the
+ * specified number of instances of the generated class. (Note that the
name
+ * is a misnomer, it would be better called
+ * <tt>getImplementationInstances</tt>.)
+ *
+ * @param cg the code generator
+ * @return list of instances of the generated class
*/
- public FragmentHandle getHandle() {
- return fragment.getHandle();
- }
-
- public String getFragIdString() {
- final FragmentHandle handle = getHandle();
- final String frag = handle != null ? handle.getMajorFragmentId() + ":"
+ handle.getMinorFragmentId() : "0:0";
- return frag;
- }
+ <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int
instanceCount)
+ throws ClassTransformationException, IOException;
/**
- * Get this fragment's allocator.
- * @return the allocator
+ * Return the set of execution controls used to inject faults into
running
+ * code for testing.
+ *
+ * @return the execution controls
*/
- @Deprecated
- public BufferAllocator getAllocator() {
- if (allocator == null) {
- logger.debug("Fragment: " + getFragIdString() + " Allocator is
NULL");
- }
- return allocator;
- }
+ ExecutionControls getExecutionControls();
- public BufferAllocator getNewChildAllocator(final String operatorName,
- final int operatorId,
- final long initialReservation,
- final long maximumReservation) throws OutOfMemoryException {
- return allocator.newChildAllocator(
- "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" +
operatorId + ":" + operatorName,
- initialReservation,
- maximumReservation
- );
- }
+ /**
+ * Returns the Drill configuration for this run. Note that the config is
+ * global and immutable.
+ *
+ * @return the Drill configuration
+ */
+ DrillConfig getConfig();
- public boolean isOverMemoryLimit() {
- return allocator.isOverLimit();
- }
+ FragmentStats getStats();
- @Override
- protected CodeCompiler getCompiler() {
- return context.getCompiler();
- }
+ CodeCompiler getCompiler();
- public AccountingUserConnection getUserDataTunnel() {
- Preconditions.checkState(connection != null, "Only Root fragment can
get UserDataTunnel");
- return accountingUserConnection;
- }
+ Collection<CoordinationProtos.DrillbitEndpoint> getBits();
- public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
- return context.getController().getTunnel(endpoint);
- }
+ CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
- public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint
endpoint) {
- AccountingDataTunnel tunnel = tunnels.get(endpoint);
- if (tunnel == null) {
- tunnel = new
AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint),
sendingAccountor, statusHandler);
- tunnels.put(endpoint, tunnel);
- }
- return tunnel;
- }
+ CoordinationProtos.DrillbitEndpoint getEndpoint();
- public IncomingBuffers getBuffers() {
- return buffers;
- }
+ Controller getController();
- public OperatorContext newOperatorContext(PhysicalOperator popConfig,
OperatorStats stats)
- throws OutOfMemoryException {
- OperatorContextImpl context = new OperatorContextImpl(popConfig, this,
stats);
- contexts.add(context);
- return context;
- }
+ OperatorCreatorRegistry getOperatorCreatorRegistry();
--- End diff --
Moved to ExecutorFragmentContext
---