http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java new file mode 100644 index 0000000..b3c84bc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -0,0 +1,491 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.compile.CodeCompiler; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.holders.ValueHolder; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.UserClientConnection; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.rpc.control.WorkEventBus; +import org.apache.drill.exec.rpc.user.UserServer; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.QueryProfileStoreContext; +import org.apache.drill.exec.server.options.FragmentOptionManager; +import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionSet; +import org.apache.drill.exec.store.PartitionExplorer; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.testing.ExecutionControls; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.exec.work.batch.IncomingBuffers; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import io.netty.buffer.DrillBuf; + +/** + * Contextual objects required for execution of a particular fragment. + * This is the implementation; use <tt>FragmentContext</tt> + * in code to allow tests to use test-time implementations. + */ +public class FragmentContextImpl extends BaseFragmentContext implements ExecutorFragmentContext { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContextImpl.class); + + private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); + private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); + + private final DrillbitContext context; + private final UserClientConnection connection; // is null if this context is for non-root fragment + private final QueryContext queryContext; // is null if this context is for non-root fragment + private final FragmentStats stats; + private final BufferAllocator allocator; + private final PlanFragment fragment; + private final ContextInformation contextInformation; + private IncomingBuffers buffers; + private final OptionManager fragmentOptions; + private final BufferManager bufferManager; + private ExecutorState executorState; + private final ExecutionControls executionControls; + + private final SendingAccountor sendingAccountor = new SendingAccountor(); + private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { + @Override + public void accept(final RpcException e) { + fail(e); + } + + @Override + public void interrupt(final InterruptedException e) { + if (executorState.shouldContinue()) { + logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); + fail(e); + } + } + }; + + private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); + private final AccountingUserConnection accountingUserConnection; + /** Stores constants and their holders by type */ + private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; + + /** + * Create a FragmentContext instance for non-root fragment. + * + * @param dbContext DrillbitContext. + * @param fragment Fragment implementation. + * @param funcRegistry FunctionImplementationRegistry. + * @throws ExecutionSetupException + */ + public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment, + final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { + this(dbContext, fragment, null, null, funcRegistry); + } + + /** + * Create a FragmentContext instance for root fragment. + * + * @param dbContext DrillbitContext. + * @param fragment Fragment implementation. + * @param queryContext QueryContext. + * @param connection UserClientConnection. + * @param funcRegistry FunctionImplementationRegistry. + * @throws ExecutionSetupException + */ + public FragmentContextImpl(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, + final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) + throws ExecutionSetupException { + super(funcRegistry); + this.context = dbContext; + this.queryContext = queryContext; + this.connection = connection; + this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); + this.fragment = fragment; + contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); + + logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); + logger.debug("Fragment max allocation: {}", fragment.getMemMax()); + + final OptionList list; + if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { + list = new OptionList(); + } else { + try { + list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); + } catch (final Exception e) { + throw new ExecutionSetupException("Failure while reading plan options.", e); + } + } + fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); + + executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint()); + + // Add the fragment context to the root allocator. + // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments + try { + allocator = context.getAllocator().newChildAllocator( + "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()), + fragment.getMemInitial(), + fragment.getMemMax()); + Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); + } catch (final OutOfMemoryException e) { + throw UserException.memoryError(e) + .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId()) + .build(logger); + } catch(final Throwable e) { + throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); + } + + stats = new FragmentStats(allocator, fragment.getAssignment()); + bufferManager = new BufferManagerImpl(this.allocator); + constantValueHolderCache = Maps.newHashMap(); + } + + /** + * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying + * the long list of test files. + */ + public FragmentContextImpl(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, + FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { + this(dbContext, fragment, null, connection, funcRegistry); + } + + @Override + public OptionManager getOptions() { + return fragmentOptions; + } + + @Override + public PhysicalPlanReader getPlanReader() { + return context.getPlanReader(); + } + + @Override + public ClusterCoordinator getClusterCoordinator() { + return context.getClusterCoordinator(); + } + + @Override + public void setBuffers(final IncomingBuffers buffers) { + Preconditions.checkArgument(this.buffers == null, "Can only set buffers once."); + this.buffers = buffers; + } + public QueryProfileStoreContext getProfileStoreContext() { + return context.getProfileStoreContext(); + } + + @Override + public Set<Map.Entry<UserServer.BitToUserConnection, UserServer.BitToUserConnectionConfig>> getUserConnections() { + return context.getUserConnections(); + } + + public void setExecutorState(final ExecutorState executorState) { + Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once."); + this.executorState = executorState; + } + + public void fail(final Throwable cause) { + executorState.fail(cause); + } + + public SchemaPlus getFullRootSchema() { + if (queryContext == null) { + fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + + "This is a non-root fragment.")); + return null; + } + + final boolean isImpersonationEnabled = isImpersonationEnabled(); + // If impersonation is enabled, we want to view the schema as query user and suppress authorization errors. As for + // InfoSchema purpose we want to show tables the user has permissions to list or query. If impersonation is + // disabled view the schema as Drillbit process user and throw authorization errors to client. + SchemaConfig schemaConfig = SchemaConfig + .newBuilder( + isImpersonationEnabled ? queryContext.getQueryUserName() : ImpersonationUtil.getProcessUserName(), + queryContext) + .setIgnoreAuthErrors(isImpersonationEnabled) + .build(); + + return queryContext.getFullRootSchema(schemaConfig); + } + + public FragmentStats getStats() { + return stats; + } + + @Override + public Collection<DrillbitEndpoint> getBits() { + return context.getBits(); + } + + @Override + public ContextInformation getContextInformation() { + return contextInformation; + } + + @Override + public DrillbitEndpoint getForemanEndpoint() { + return fragment.getForeman(); + } + + @Override + public DrillbitEndpoint getEndpoint() { + return context.getEndpoint(); + } + + @Override + public Controller getController() { + return context.getController(); + } + + @Override + public OperatorCreatorRegistry getOperatorCreatorRegistry() { + return context.getOperatorCreatorRegistry(); + } + + @Override + public ExecutorService getScanDecodeExecutor() { + return context.getScanDecodeExecutor(); + } + + @Override + public ExecutorService getScanExecutor() { + return context.getScanExecutor(); + } + + /** + * The FragmentHandle for this Fragment + * @return FragmentHandle + */ + public FragmentHandle getHandle() { + return fragment.getHandle(); + } + + public String getFragIdString() { + final FragmentHandle handle = getHandle(); + final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0"; + return frag; + } + + @Override + public boolean isUserAuthenticationEnabled() { + // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is + // no config + if (getConfig() == null) { + return false; + } + + return getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED); + } + + /** + * Get this fragment's allocator. + * @return the allocator + */ + @Deprecated + public BufferAllocator getAllocator() { + if (allocator == null) { + logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL"); + } + return allocator; + } + + @Override + public BufferAllocator getNewChildAllocator(final String operatorName, + final int operatorId, + final long initialReservation, + final long maximumReservation) throws OutOfMemoryException { + return allocator.newChildAllocator( + "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName, + initialReservation, + maximumReservation + ); + } + + public boolean isOverMemoryLimit() { + return allocator.isOverLimit(); + } + + @Override + public CodeCompiler getCompiler() { + return context.getCompiler(); + } + + @Override + public AccountingUserConnection getUserDataTunnel() { + Preconditions.checkState(connection != null, "Only Root fragment can get UserDataTunnel"); + return accountingUserConnection; + } + + @Override + public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) { + AccountingDataTunnel tunnel = tunnels.get(endpoint); + if (tunnel == null) { + tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler); + tunnels.put(endpoint, tunnel); + } + return tunnel; + } + + public IncomingBuffers getBuffers() { + return buffers; + } + + public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats) + throws OutOfMemoryException { + OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats); + contexts.add(context); + return context; + } + + public OperatorContext newOperatorContext(PhysicalOperator popConfig) + throws OutOfMemoryException { + OperatorContextImpl context = new OperatorContextImpl(popConfig, this); + contexts.add(context); + return context; + } + + @Override + public DrillConfig getConfig() { + return context.getConfig(); + } + + @Override + public ExecutorState getExecutorState() { + return executorState; + } + + @Override + public ExecutionControls getExecutionControls() { + return executionControls; + } + + public String getQueryUserName() { + return fragment.getCredentials().getUserName(); + } + + public boolean isImpersonationEnabled() { + // TODO(DRILL-2097): Until SimpleRootExec tests are removed, we need to consider impersonation disabled if there is + // no config + if (getConfig() == null) { + return false; + } + + return getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED); + } + + @Override + public void close() { + waitForSendComplete(); + + // close operator context + for (OperatorContextImpl opContext : contexts) { + suppressingClose(opContext); + } + + suppressingClose(bufferManager); + suppressingClose(buffers); + suppressingClose(allocator); + } + + private void suppressingClose(final AutoCloseable closeable) { + try { + if (closeable != null) { + closeable.close(); + } + } catch (final Exception e) { + fail(e); + } + } + + @Override + public PartitionExplorer getPartitionExplorer() { + throw new UnsupportedOperationException(String.format("The partition explorer interface can only be used " + + "in functions that can be evaluated at planning time. Make sure that the %s configuration " + + "option is set to true.", PlannerSettings.CONSTANT_FOLDING.getOptionName())); + } + + @Override + public ValueHolder getConstantValueHolder(String value, MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) { + if (!constantValueHolderCache.containsKey(value)) { + constantValueHolderCache.put(value, Maps.<MinorType, ValueHolder>newHashMap()); + } + + Map<MinorType, ValueHolder> holdersByType = constantValueHolderCache.get(value); + ValueHolder valueHolder = holdersByType.get(type); + if (valueHolder == null) { + valueHolder = holderInitializer.apply(getManagedBuffer()); + holdersByType.put(type, valueHolder); + } + return valueHolder; + } + + public ExecutorService getExecutor(){ + return context.getExecutor(); + } + + @Override + public void waitForSendComplete() { + sendingAccountor.waitForSendComplete(); + } + + @Override + public WorkEventBus getWorkEventbus() { + return context.getWorkBus(); + } + + public boolean isBuffersDone() { + Preconditions.checkState(this.buffers != null, "Incoming Buffers is not set in this fragment context"); + return buffers.isDone(); + } + + @Override + protected BufferManager getBufferManager() { + return bufferManager; + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index 3d2fdd8..e6ea5b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -60,7 +60,7 @@ public interface OperatorContext { BufferAllocator getAllocator(); - FragmentContextInterface getFragmentContext(); + FragmentContext getFragmentContext(); DrillBuf replace(DrillBuf old, int newSize); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index e4c7dd9..550cea2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -43,11 +43,11 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable { */ private ListeningExecutorService delegatePool; - public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException { + public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context) throws OutOfMemoryException { this(popConfig, context, null); } - public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) + public OperatorContextImpl(PhysicalOperator popConfig, FragmentContextImpl context, OperatorStats stats) throws OutOfMemoryException { super(context, context.getNewChildAllocator(popConfig.getClass().getSimpleName(), @@ -74,12 +74,8 @@ class OperatorContextImpl extends BaseOperatorContext implements AutoCloseable { return; } logger.debug("Closing context for {}", popConfig != null ? getName() : null); - - try { - super.close(); - } finally { - closed = true; - } + closed = true; + super.close(); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index a38c3c2..e744f1e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -35,8 +35,6 @@ import com.carrotsearch.hppc.procedures.IntLongProcedure; import com.google.common.annotations.VisibleForTesting; public class OperatorStats { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorStats.class); - protected final int operatorId; protected final int operatorType; private final BufferAllocator allocator; @@ -61,7 +59,6 @@ public class OperatorStats { private long setupMark; private long waitMark; -// private long schemas; private int inputCount; public OperatorStats(OpProfileDef def, BufferAllocator allocator){ http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java new file mode 100644 index 0000000..a8dab2c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/RootFragmentContext.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +public interface RootFragmentContext extends ExchangeFragmentContext { + FragmentStats getStats(); + + void setExecutorState(final ExecutorState executorState); +} http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java index 6752d76..04d29e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java @@ -36,7 +36,7 @@ public interface UdfUtilities { // Map between injectable classes and their respective getter methods // used for code generation - public static final ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS = + ImmutableMap<Class<?>, String> INJECTABLE_GETTER_METHODS = new ImmutableMap.Builder<Class<?>, String>() .put(DrillBuf.class, "getManagedBuffer") .put(PartitionExplorer.class, "getPartitionExplorer") http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index b4c8536..b93237e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.base; import com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -83,12 +82,6 @@ public abstract class AbstractBase implements PhysicalOperator { return SelectionVectorMode.NONE; } - // Not available. Presumably because Drill does not currently use - // this value, though it does appear in some test physical plans. -// public void setInitialAllocation(long alloc) { -// initialAllocation = alloc; -// } - @Override public long getInitialAllocation() { return initialAllocation; @@ -116,7 +109,6 @@ public abstract class AbstractBase implements PhysicalOperator { @Override public void setMaxAllocation(long maxAllocation) { this.maxAllocation = maxAllocation; - /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/ } /** @@ -126,9 +118,6 @@ public abstract class AbstractBase implements PhysicalOperator { @Override @JsonIgnore public boolean isBufferedOperator() { return false; } - // @Override - // public void setBufferedOperator(boolean bo) {} - @Override public String getUserName() { return userName; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java index d8014f5..35c371c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java @@ -35,7 +35,7 @@ public interface Exchange extends PhysicalOperator { * Assignment dependency describes whether sender fragments depend on receiver fragment's endpoint assignment for * determining its parallelization and endpoint assignment and vice versa. */ - public enum ParallelizationDependency { + enum ParallelizationDependency { SENDER_DEPENDS_ON_RECEIVER, // Sending fragment depends on receiving fragment for parallelization RECEIVER_DEPENDS_ON_SENDER, // Receiving fragment depends on sending fragment for parallelization (default value). } @@ -46,7 +46,7 @@ public interface Exchange extends PhysicalOperator { * * @param senderLocations */ - public abstract void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException; + void setupSenders(int majorFragmentId, List<DrillbitEndpoint> senderLocations) throws PhysicalOperatorSetupException; /** * Inform this Exchange node about its receiver locations. This list should be index-ordered the same as the expected @@ -54,7 +54,7 @@ public interface Exchange extends PhysicalOperator { * * @param receiverLocations */ - public abstract void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException; + void setupReceivers(int majorFragmentId, List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException; /** * Get the Sender associated with the given minorFragmentId. Cannot be called until after setupSenders() and @@ -66,7 +66,7 @@ public interface Exchange extends PhysicalOperator { * The feeding node for the requested sender. * @return The materialized sender for the given arguments. */ - public abstract Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException; + Sender getSender(int minorFragmentId, PhysicalOperator child) throws PhysicalOperatorSetupException; /** * Get the Receiver associated with the given minorFragmentId. Cannot be called until after setupSenders() and @@ -76,7 +76,7 @@ public interface Exchange extends PhysicalOperator { * The minor fragment id, must be in the range [0, fragment.width). * @return The materialized recevier for the given arguments. */ - public abstract Receiver getReceiver(int minorFragmentId); + Receiver getReceiver(int minorFragmentId); /** * Provide parallelization parameters for sender side of the exchange. Output includes min width, @@ -86,7 +86,7 @@ public interface Exchange extends PhysicalOperator { * @return */ @JsonIgnore - public abstract ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints); + ParallelizationInfo getSenderParallelizationInfo(List<DrillbitEndpoint> receiverFragmentEndpoints); /** * Provide parallelization parameters for receiver side of the exchange. Output includes min width, @@ -96,18 +96,18 @@ public interface Exchange extends PhysicalOperator { * @return */ @JsonIgnore - public abstract ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints); + ParallelizationInfo getReceiverParallelizationInfo(List<DrillbitEndpoint> senderFragmentEndpoints); /** * Return the feeding child of this operator node. * * @return */ - public PhysicalOperator getChild(); + PhysicalOperator getChild(); /** * Get the parallelization dependency of the Exchange. */ @JsonIgnore - public ParallelizationDependency getParallelizationDependency(); + ParallelizationDependency getParallelizationDependency(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 980f32c..7b76194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -47,7 +47,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { * @return */ @JsonIgnore - public boolean isExecutable(); + boolean isExecutable(); /** * Describes the SelectionVector Mode for the output steam from this physical op. @@ -55,7 +55,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { * @return */ @JsonIgnore - public SelectionVectorMode getSVMode(); + SelectionVectorMode getSVMode(); /** * Provides capability to build a set of output based on traversing a query graph tree. @@ -63,7 +63,7 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { * @param physicalVisitor * @return */ - public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E; + <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E; /** * Regenerate with this node with a new set of children. This is used in the case of materialization or optimization. @@ -71,44 +71,44 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { * @return */ @JsonIgnore - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException; + PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException; /** * @return The memory to preallocate for this operator */ - public long getInitialAllocation(); + long getInitialAllocation(); /** * @return The maximum memory this operator can allocate */ - public long getMaxAllocation(); + long getMaxAllocation(); /** * * @param maxAllocation The max memory allocation to be set */ - public void setMaxAllocation(long maxAllocation); + void setMaxAllocation(long maxAllocation); /** * * @return True iff this operator manages its memory (including disk spilling) */ @JsonIgnore - public boolean isBufferedOperator(); + boolean isBufferedOperator(); // public void setBufferedOperator(boolean bo); @JsonProperty("@id") - public int getOperatorId(); + int getOperatorId(); @JsonProperty("@id") - public void setOperatorId(int id); + void setOperatorId(int id); @JsonProperty("cost") - public void setCost(double cost); + void setCost(double cost); @JsonProperty("cost") - public double getCost(); + double getCost(); /** * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this @@ -116,8 +116,8 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { * @return */ @JsonProperty("userName") - public String getUserName(); + String getUserName(); @JsonIgnore - public int getOperatorType(); + int getOperatorType(); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index d01e294..82887ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.drill.common.DeferredException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.ops.OperatorUtilities; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.CloseableRecordBatch; @@ -39,10 +39,10 @@ public abstract class BaseRootExec implements RootExec { protected OperatorStats stats = null; protected OperatorContext oContext = null; - protected FragmentContext fragmentContext = null; + protected RootFragmentContext fragmentContext = null; private List<CloseableRecordBatch> operators; - public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { + public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { this.oContext = fragmentContext.newOperatorContext(config, stats); stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorUtilities.getChildCount(config)), @@ -51,8 +51,8 @@ public abstract class BaseRootExec implements RootExec { this.fragmentContext = fragmentContext; } - public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, - final PhysicalOperator config) throws OutOfMemoryException { + public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext, + final PhysicalOperator config) throws OutOfMemoryException { this.oContext = oContext; stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorUtilities.getChildCount(config)), @@ -87,7 +87,7 @@ public abstract class BaseRootExec implements RootExec { public final boolean next() { // Stats should have been initialized assert stats != null; - if (!fragmentContext.shouldContinue()) { + if (!fragmentContext.getExecutorState().shouldContinue()) { return false; } try { @@ -156,7 +156,7 @@ public abstract class BaseRootExec implements RootExec { try { df.close(); } catch (Exception e) { - fragmentContext.fail(e); + fragmentContext.getExecutorState().fail(e); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java index af99b5e..e9113ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java @@ -20,14 +20,12 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public interface BatchCreator<T extends PhysicalOperator> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class); - - public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) + CloseableRecordBatch getBatch(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index b418fd4..2e0d14e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector; @@ -66,7 +66,7 @@ public class ImplCreator { * @return RootExec of fragment. * @throws ExecutionSetupException */ - public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { + public static RootExec getExec(ExecutorFragmentContext context, FragmentRoot root) throws ExecutionSetupException { Preconditions.checkNotNull(root); Preconditions.checkNotNull(context); @@ -99,14 +99,14 @@ public class ImplCreator { return rootExec; } catch(Exception e) { AutoCloseables.close(e, creator.getOperators()); - context.fail(e); + context.getExecutorState().fail(e); } return null; } /** Create RootExec and its children (RecordBatches) for given FragmentRoot */ - @SuppressWarnings("unchecked") - private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException { + + private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException { final List<RecordBatch> childRecordBatches = getChildren(root, context); if (context.isImpersonationEnabled()) { @@ -131,7 +131,7 @@ public class ImplCreator { /** Create a RecordBatch and its children for given PhysicalOperator */ @VisibleForTesting - public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { Preconditions.checkNotNull(op); final List<RecordBatch> childRecordBatches = getChildren(op, context); @@ -164,9 +164,9 @@ public class ImplCreator { } /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */ - private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + private Object getOpCreator(PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { final Class<? extends PhysicalOperator> opClass = op.getClass(); - Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass); + Object opCreator = context.getOperatorCreatorRegistry().getOperatorCreator(opClass); if (opCreator == null) { throw new UnsupportedOperationException( String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName())); @@ -176,7 +176,7 @@ public class ImplCreator { } /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */ - private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + private List<RecordBatch> getChildren(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { List<RecordBatch> children = Lists.newArrayList(); for (PhysicalOperator child : op) { children.add(getRecordBatch(child, context)); @@ -184,5 +184,4 @@ public class ImplCreator { return children; } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 690a662..0ef84b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.record.RecordBatch; @@ -33,7 +33,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> @SuppressWarnings("resource") @Override - public MergingRecordBatch getBatch(FragmentContext context, + public MergingRecordBatch getBatch(ExecutorFragmentContext context, MergingReceiverPOP receiver, List<RecordBatch> children) throws ExecutionSetupException, OutOfMemoryException { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java index f3d9524..17e4027 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java @@ -20,12 +20,10 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.RecordBatch; public interface RootCreator<T extends PhysicalOperator> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class); - - public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; + RootExec getRoot(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index e0d1545..f6a4863 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -82,14 +82,13 @@ public class ScanBatch implements CloseableRecordBatch { private String currentReaderClassName; /** * - * @param subScanConfig * @param context * @param oContext * @param readerList * @param implicitColumnList : either an emptylist when all the readers do not have implicit * columns, or there is a one-to-one mapping between reader and implicitColumns. */ - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + public ScanBatch(FragmentContext context, OperatorContext oContext, List<RecordReader> readerList, List<Map<String, String>> implicitColumnList) { this.context = context; @@ -126,8 +125,7 @@ public class ScanBatch implements CloseableRecordBatch { public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, List<RecordReader> readers) throws ExecutionSetupException { - this(subScanConfig, context, - context.newOperatorContext(subScanConfig), + this(context, context.newOperatorContext(subScanConfig), readers, Collections.<Map<String, String>> emptyList()); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index d9abf40..46dc450 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingUserConnection; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; @@ -38,11 +39,10 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; import com.google.common.base.Preconditions; public class ScreenCreator implements RootCreator<Screen> { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class); @Override - public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) + public RootExec getRoot(ExecutorFragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkNotNull(children); Preconditions.checkArgument(children.size() == 1); @@ -52,7 +52,7 @@ public class ScreenCreator implements RootCreator<Screen> { public static class ScreenRoot extends BaseRootExec { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); private final RecordBatch incoming; - private final FragmentContext context; + private final RootFragmentContext context; private final AccountingUserConnection userConnection; private RecordMaterializer materializer; @@ -67,13 +67,17 @@ public class ScreenCreator implements RootCreator<Screen> { } } - public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { + public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { super(context, config); this.context = context; this.incoming = incoming; userConnection = context.getUserDataTunnel(); } + public RootFragmentContext getContext() { + return context; + } + @Override public boolean innerNext() { IterOutcome outcome = next(incoming); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 2f33193..9231aef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; @@ -36,7 +37,7 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override - public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children) + public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return new SingleSenderRootExec(context, children.iterator().next(), config); @@ -64,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } } - public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { + public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null), config); this.incoming = batch; assert incoming != null; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index 7f9aca4..97e26b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -26,7 +26,6 @@ import javax.inject.Named; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 442a753..1683286 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -281,7 +281,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } } @@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); if (copier == null) { - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); } else { for (VectorWrapper<?> i : batch) { @@ -323,7 +323,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); VectorContainer newQueue = new VectorContainer(); - builder.build(context, newQueue); + builder.build(newQueue); priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent()); builder.getSv4().clear(); selectionVector4.clear(); @@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) throws SchemaChangeException, ClassTransformationException, IOException { return createNewPriorityQueue( - mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(), + mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(), config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode()); } @@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); try { @@ -434,7 +434,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); final VectorContainer oldSchemaContainer = new VectorContainer(oContext); - builder.build(context, oldSchemaContainer); + builder.build(oldSchemaContainer); oldSchemaContainer.setRecordCount(builder.getSv4().getCount()); final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext); newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java index e815bff..d477744 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.TopN; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class TopNSortBatchCreator implements BatchCreator<TopN>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class); - +public class TopNSortBatchCreator implements BatchCreator<TopN> { @Override - public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) + public TopNBatch getBatch(ExecutorFragmentContext context, TopN config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new TopNBatch(config, context, children.iterator().next()); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 0ea17d6..e98a7c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -118,7 +118,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } catch(IOException ex) { logger.error("Failure during query", ex); kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } @@ -185,7 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { try { recordWriter.cleanup(); } catch(IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); } finally { try { if (!processed) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index b3d68d3..47f1017 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -186,7 +186,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return aggregator.getOutcome(); case UPDATE_AGGREGATOR: - context.fail(UserException.unsupportedError() + context.getExecutorState().fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged( "Hash aggregate does not support schema change", incomingSchema, @@ -212,7 +212,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { this.aggregator = createAggregatorInternal(); return true; } catch (SchemaChangeException | ClassTransformationException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); container.clear(); incoming.kill(false); return false; @@ -227,9 +227,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { ClassGenerator<HashAggregator> cg = top.getRoot(); ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder"); top.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // top.saveCodeForDebugging(true); - container.clear(); int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java index 1397342..ed203e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class HashAggBatchCreator implements BatchCreator<HashAggregate>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class); - +public class HashAggBatchCreator implements BatchCreator<HashAggregate> { @Override - public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) + public HashAggBatch getBatch(ExecutorFragmentContext context, HashAggregate config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new HashAggBatch(config, children.iterator().next(), context); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 89ba59b..ef8d9d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -300,7 +300,9 @@ public abstract class HashAggTemplate implements HashAggregator { } @Override - public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException { + public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, + RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, + TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException { if (valueExprs == null || valueFieldIds == null) { throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables."); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 16b5499..3384e67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -35,34 +35,35 @@ import org.apache.drill.exec.record.VectorContainer; public interface HashAggregator { - public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION = + TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class); - public static enum AggOutcome { + enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN } // For returning results from outputCurrentBatch // OK - batch returned, NONE - end of data, RESTART - call again - public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } + enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } - public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException; + void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException; - public abstract IterOutcome getOutcome(); + IterOutcome getOutcome(); - public abstract int getOutputCount(); + int getOutputCount(); - public abstract AggOutcome doWork(); + AggOutcome doWork(); - public abstract void cleanup(); + void cleanup(); - public abstract boolean allFlushed(); + boolean allFlushed(); - public abstract boolean buildComplete(); + boolean buildComplete(); - public abstract AggIterOutcome outputCurrentBatch(); + AggIterOutcome outputCurrentBatch(); - public abstract boolean earlyOutput(); + boolean earlyOutput(); - public abstract RecordBatch getNewIncoming(); + RecordBatch getNewIncoming(); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index ac4b29d..c473b94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -45,19 +45,15 @@ public class SpilledRecordbatch implements CloseableRecordBatch { private int spilledBatches; private FragmentContext context; private BatchSchema schema; - private OperatorContext oContext; private SpillSet spillSet; - // Path spillStreamPath; private String spillFile; VectorAccessibleSerializable vas; - public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { + public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { this.context = context; this.schema = schema; this.spilledBatches = spilledBatches; - this.oContext = oContext; this.spillSet = spillSet; - //this.spillStreamPath = spillStreamPath; this.spillFile = spillFile; vas = new VectorAccessibleSerializable(oContext.getAllocator()); container = vas.get(); @@ -126,7 +122,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { @Override public IterOutcome next() { - if ( ! context.shouldContinue() ) { return IterOutcome.STOP; } + if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; } if ( spilledBatches <= 0 ) { // no more batches to read in this partition this.close(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index b33dbd6..34ab97e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -202,7 +202,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } return outcome; case UPDATE_AGGREGATOR: - context.fail(UserException.unsupportedError() + context.getExecutorState().fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage()) .build(logger)); close(); @@ -263,7 +263,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { this.aggregator = createAggregatorInternal(); return true; } catch (SchemaChangeException | ClassTransformationException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); container.clear(); incoming.kill(false); return false; @@ -275,8 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java index cac5b06..864271e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class); - +public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate> { @Override - public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) + public StreamingAggBatch getBatch(ExecutorFragmentContext context, StreamingAggregate config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new StreamingAggBatch(config, children.iterator().next(), context); } - - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 61c82d8..7e13eb3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java index 01122be..6c42a1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -30,7 +30,7 @@ import com.google.common.collect.Iterators; public class BroadcastSenderCreator implements RootCreator<BroadcastSender> { @Override - public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException { + public RootExec getRoot(ExecutorFragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 80d7744..bd4a1ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -21,8 +21,8 @@ import java.util.List; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; @@ -57,7 +57,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { } } - public BroadcastSenderRootExec(FragmentContext context, + public BroadcastSenderRootExec(RootFragmentContext context, RecordBatch incoming, BroadcastSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null), config); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 9bab67d..703868e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -225,12 +225,11 @@ public class ChainedHashTable { setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true); HashTable ht = context.getImplementationClass(top); - ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); + ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); return ht; } - private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 3749e3e..d28fe49 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -20,64 +20,58 @@ package org.apache.drill.exec.physical.impl.common; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.common.exceptions.RetryAfterSpillException; public interface HashTable { - - public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = - new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class); - - /** - * The initial default capacity of the hash table (in terms of number of buckets). - */ - static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; + TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = + new TemplateClassDefinition<>(HashTable.class, HashTableTemplate.class); /** * The maximum capacity of the hash table (in terms of number of buckets). */ - static final public int MAXIMUM_CAPACITY = 1 << 30; + int MAXIMUM_CAPACITY = 1 << 30; /** * The default load factor of a hash table. */ - static final public float DEFAULT_LOAD_FACTOR = 0.75f; + float DEFAULT_LOAD_FACTOR = 0.75f; - static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} + enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} /** * The batch size used for internal batch holders */ - static final public int BATCH_SIZE = Character.MAX_VALUE + 1; - static final public int BATCH_MASK = 0x0000FFFF; + int BATCH_SIZE = Character.MAX_VALUE + 1; + int BATCH_MASK = 0x0000FFFF; - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); + void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, + VectorContainer htContainerOrig); - public void updateBatches() throws SchemaChangeException; + void updateBatches() throws SchemaChangeException; - public int getHashCode(int incomingRowIdx) throws SchemaChangeException; + int getHashCode(int incomingRowIdx) throws SchemaChangeException; - public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException; + PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException; - public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; + int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; - public void getStats(HashTableStats stats); + void getStats(HashTableStats stats); - public int size(); + int size(); - public boolean isEmpty(); + boolean isEmpty(); - public void clear(); + void clear(); - public void reinit(RecordBatch newIncoming); + void reinit(RecordBatch newIncoming); - public void reset(); + void reset(); - public void setMaxVarcharSize(int size); + void setMaxVarcharSize(int size); - public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords); + boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 6cbbdcb..272d782 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -48,7 +47,6 @@ public abstract class HashTableTemplate implements HashTable { private static final boolean EXTRA_DEBUG = false; private static final int EMPTY_SLOT = -1; - // private final int MISSING_VALUE = 65544; // A hash 'bucket' consists of the start index to indicate start of a hash chain @@ -78,8 +76,6 @@ public abstract class HashTableTemplate implements HashTable { // Placeholder for the current index while probing the hash table private IndexPointer currentIdxHolder; -// private FragmentContext context; - private BufferAllocator allocator; // The incoming build side record batch @@ -451,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable { @Override - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { + public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); @@ -470,7 +466,6 @@ public abstract class HashTableTemplate implements HashTable { } this.htConfig = htConfig; -// this.context = context; this.allocator = allocator; this.incomingBuild = incomingBuild; this.incomingProbe = incomingProbe;
