http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index f4a9825..aa067cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -145,7 +145,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { try { doWork(); } catch (DrillException e) { - context.fail(e); + context.getExecutorState().fail(e); cleanup(); return IterOutcome.STOP; }
http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index c212593..9cde1a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -201,7 +201,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // where it would have been passed to context.fail() // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping // right away, this will also make sure we collect any additional exception we may get while cleaning up - context.fail(e); + context.getExecutorState().fail(e); } } } @@ -483,7 +483,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { mSorter.sort(this.container); // sort may have prematurely exited due to should continue returning false. - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } @@ -522,12 +522,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } catch (SchemaChangeException ex) { kill(false); - context.fail(UserException.unsupportedError(ex) + context.getExecutorState().fail(UserException.unsupportedError(ex) .message("Sort doesn't currently support sorts with changing schemas").build(logger)); return IterOutcome.STOP; } catch(ClassTransformationException | IOException ex) { kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } catch (UnsupportedOperationException e) { throw new RuntimeException(e); @@ -650,7 +650,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { try { Thread.sleep(waitTime * 1000); } catch(final InterruptedException e) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { throw e; } } @@ -688,11 +688,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException { - return createNewMSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + return createNewMSorter(context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); } - private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) - throws ClassTransformationException, IOException, SchemaChangeException{ + private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet + rightMapping) + throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions()); ClassGenerator<MSorter> g = cg.getRoot(); g.setMappingSet(mainMapping); @@ -735,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch) - throws ClassTransformationException, IOException, SchemaChangeException{ + throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // This class can generate plain-old Java. http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java index e579fc2..6a60196 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java @@ -22,19 +22,19 @@ import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.server.options.OptionManager; import com.google.common.base.Preconditions; +import org.apache.drill.exec.server.options.OptionManager; public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{ @Override - public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) + public AbstractRecordBatch<ExternalSort> getBatch(ExecutorFragmentContext context, ExternalSort config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 2f3d2f6..9b69170 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -130,7 +130,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { while (runStarts.size() > 1) { // check if we're cancelled/failed frequently - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index 733ea5e..4dbee3e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; public interface SingleBatchSorter { - public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; public void sort(SelectionVector2 vector2) throws SchemaChangeException; public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION = http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index 0f4680d..4a1af4e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In private SelectionVector2 vector2; @Override - public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ Preconditions.checkNotNull(vector2); this.vector2 = vector2; try { @@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In } } - public abstract void doSetup(@Named("context") FragmentContextInterface context, + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 9150fe3..23e66a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -135,8 +135,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; * into new batches of a size determined by that operator.</li> * <li>A series of batches, without a selection vector, if the sort spills to * disk. In this case, the downstream operator will still be a selection vector - * remover, but there is nothing for that operator to remove. Each batch is - * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li> + * remover, but there is nothing for that operator to remove. * </ul> * Note that, even in the in-memory sort case, this operator could do the copying * to eliminate the extra selection vector remover. That is left as an exercise @@ -375,7 +374,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // sort may have prematurely exited due to shouldContinue() returning false. - if (! context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { sortState = SortState.DONE; return IterOutcome.STOP; } @@ -440,8 +439,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { /** * Handle a new schema from upstream. The ESB is quite limited in its ability * to handle schema changes. - * - * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA */ private void setupSchema() { @@ -482,6 +479,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { * <p> * Some Drill code ends up calling close() two or more times. The code * here protects itself from these undesirable semantics. + * </p> */ @Override http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java index 698e32f..625d360 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java @@ -24,7 +24,7 @@ import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { */ private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue(); - private FragmentContextInterface context; + private FragmentContext context; /** * Controls the maximum size of batches exposed to downstream @@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { private int desiredRecordBatchCount; @Override - public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4, + public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); @@ -162,7 +162,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { final int totalCount = this.vector4.getTotalCount(); // check if we're cancelled/failed recently - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return; } int outIndex = 0; @@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { } } - public abstract void doSetup(@Named("context") FragmentContextInterface context, + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java index 428f6f8..8eae8b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; */ public interface MSorter { - public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4, + public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException; public void sort(); public SelectionVector4 getSV4(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index eb90614..015d078 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -103,7 +103,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } public final IterOutcome next(final RecordBatch b) { - if(!context.shouldContinue()) { + if(!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } return next(0, b); @@ -113,7 +113,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements IterOutcome next = null; stats.stopProcessing(); try{ - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } next = b.next(); @@ -147,7 +147,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return IterOutcome.NONE; case OUT_OF_MEMORY: // because we don't support schema changes, it is safe to fail the query right away - context.fail(UserException.memoryError() + context.getExecutorState().fail(UserException.memoryError() .build(logger)); // FALL-THROUGH case STOP: http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 4a9828c..07312a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -85,7 +85,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { stats.stopSetup(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index e7c0ee5..381071f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -21,14 +21,10 @@ import io.netty.buffer.ByteBuf; import java.util.concurrent.Semaphore; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; -import org.apache.drill.exec.rpc.DrillRpcFuture; -import org.apache.drill.exec.rpc.FutureBitCommand; import org.apache.drill.exec.rpc.ListeningCommand; -import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.testing.ControlsInjector; @@ -96,23 +92,6 @@ public class DataTunnel { } } - // TODO: This is not used anywhere. Can we remove this method and SendBatchAsyncFuture? - public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, FragmentWritableBatch batch) { - SendBatchAsyncFuture b = new SendBatchAsyncFuture(batch, context); - try{ - sendingSemaphore.acquire(); - manager.runCommand(b); - }catch(final InterruptedException e){ - b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e)); - - // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the - // interruption and respond to it if it wants to. - Thread.currentThread().interrupt(); - } - return b.getFuture(); - } - - private class ThrottlingOutcomeListener implements RpcOutcomeListener<Ack>{ RpcOutcomeListener<Ack> inner; @@ -166,26 +145,4 @@ public class DataTunnel { super.connectionFailed(type, t); } } - - private class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> { - final FragmentWritableBatch batch; - final FragmentContext context; - - public SendBatchAsyncFuture(FragmentWritableBatch batch, FragmentContext context) { - super(); - this.batch = batch; - this.context = context; - } - - @Override - public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { - connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers()); - } - - @Override - public String toString() { - return "SendBatch [batch.header=" + batch.getHeader() + "]"; - } - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 1f9c81c..5a0e14d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -252,6 +252,14 @@ public class BootStrapContext implements AutoCloseable { } } + if (scanExecutor != null) { + scanExecutor.shutdown(); + } + + if (scanDecodeExecutor != null) { + scanDecodeExecutor.shutdownNow(); + } + try { AutoCloseables.close(allocator, authProvider); shutdown(loop); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java index 90f5623..f0b7243 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/FragmentOptionManager.java @@ -19,11 +19,12 @@ package org.apache.drill.exec.server.options; import com.google.common.collect.Maps; import org.apache.drill.common.map.CaseInsensitiveMap; +import org.apache.drill.exec.ops.FragmentContextImpl; import java.util.Map; /** - * {@link OptionManager} that holds options within {@link org.apache.drill.exec.ops.FragmentContext}. + * {@link OptionManager} that holds options within {@link FragmentContextImpl}. */ public class FragmentOptionManager extends InMemoryOptionManager { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentOptionManager.class); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index f9d44cc..62d46d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -26,7 +26,6 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContextInterface; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.store.dfs.easy.FileWork; @@ -52,15 +51,6 @@ public class ColumnExplorer { * between actual table columns, partition columns and implicit file columns. * Also populates map with implicit columns names as keys and their values */ - public ColumnExplorer(FragmentContextInterface context, List<SchemaPath> columns) { - this(context.getOptions(), columns); - } - - /** - * Helper class that encapsulates logic for sorting out columns - * between actual table columns, partition columns and implicit file columns. - * Also populates map with implicit columns names as keys and their values - */ public ColumnExplorer(OptionManager optionManager, List<SchemaPath> columns) { this.partitionDesignator = optionManager.getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); this.columns = columns; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index f81f74e..d407ec3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -127,7 +127,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements @SuppressWarnings("resource") CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { - final ColumnExplorer columnExplorer = new ColumnExplorer(context, scan.getColumns()); + final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), scan.getColumns()); if (!columnExplorer.isStarQuery()) { scan = new EasySubScan(scan.getUserName(), scan.getWorkUnits(), scan.getFormatPlugin(), @@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); } - return new ScanBatch(scan, context, oContext, readers, implicitColumns); + return new ScanBatch(context, oContext, readers, implicitColumns); } public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java index f9dfd8b..4e71795 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java @@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; -public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class); - +public class EasyReaderBatchCreator implements BatchCreator<EasySubScan> { @Override - public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children) + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasySubScan config, List<RecordBatch> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); return config.getFormatPlugin().getReaderBatch(context, config); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java index bfb4188..f3988b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java @@ -20,16 +20,14 @@ package org.apache.drill.exec.store.dfs.easy; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class); - @Override - public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children) + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, EasyWriter config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java index 8442c32..c7b8cdd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java @@ -21,16 +21,14 @@ import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; public class DirectBatchCreator implements BatchCreator<DirectSubScan>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class); - @Override - public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, DirectSubScan config, List<RecordBatch> children) throws ExecutionSetupException { return new ScanBatch(config, context, Collections.singletonList(config.getReader())); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 4abf7a5..372a91d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -116,7 +116,7 @@ public class JSONRecordReader extends AbstractRecordReader { this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR); this.enableNanInf = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR); this.readNumbersAsDouble = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR); - this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); + this.unionEnabled = embeddedContent == null && fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY); this.skipMalformedJSONRecords = fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR); this.printSkippedMalformedJSONRecordLineNumber = fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR); setColumns(columns); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index a9a30e4..8209252 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -86,7 +86,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm if (context.getOptions().getOption(ExecConstants.ENABLE_NEW_TEXT_READER_KEY).bool_val == true) { TextParsingSettings settings = new TextParsingSettings(); settings.set((TextFormatConfig)formatConfig); - return new CompliantTextRecordReader(split, dfs, context, settings, columns); + return new CompliantTextRecordReader(split, dfs, settings, columns); } else { char delim = ((TextFormatConfig)formatConfig).getFieldDelimiter(); return new DrillTextRecordReader(split, dfs.getConf(), context, delim, columns); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java index 7009584..2eafdd5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/CompliantTextRecordReader.java @@ -27,7 +27,6 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.AbstractRecordReader; @@ -67,7 +66,7 @@ public class CompliantTextRecordReader extends AbstractRecordReader { // operator context for OutputMutator private OperatorContext oContext; - public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, FragmentContext context, TextParsingSettings settings, List<SchemaPath> columns) { + public CompliantTextRecordReader(FileSplit split, DrillFileSystem dfs, TextParsingSettings settings, List<SchemaPath> columns) { this.split = split; this.settings = settings; this.dfs = dfs; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java index ce05543..76e3ae2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java @@ -21,17 +21,15 @@ import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; -public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class); - +public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan> { @Override - public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException { RecordReader rr = config.getTable().getRecordReader(context.getFullRootSchema(), config.getFilter(), context.getOptions()); return new ScanBatch(config, context, Collections.singletonList(rr)); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java index ac9cb6a..ed01376 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/ExtendedMockRecordReader.java @@ -28,7 +28,6 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -58,7 +57,7 @@ public class ExtendedMockRecordReader extends AbstractRecordReader { private final MockScanEntry config; private final ColumnDef fields[]; - public ExtendedMockRecordReader(FragmentContext context, MockScanEntry config) { + public ExtendedMockRecordReader(MockScanEntry config) { this.config = config; fields = buildColumnDefs(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index 108a621..5c9a87a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -35,8 +35,6 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; public class MockRecordReader extends AbstractRecordReader { -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class); - private final MockScanEntry config; private final FragmentContext context; private ValueVector[] valueVectors; @@ -62,7 +60,7 @@ public class MockRecordReader extends AbstractRecordReader { return x; } - private MaterializedField getVector(String name, MajorType type, int length) { + private MaterializedField getVector(String name, MajorType type) { assert context != null : "Context shouldn't be null."; final MaterializedField f = MaterializedField.create(name, type); return f; @@ -80,7 +78,7 @@ public class MockRecordReader extends AbstractRecordReader { for (int i = 0; i < config.getTypes().length; i++) { final MajorType type = config.getTypes()[i].getMajorType(); - final MaterializedField field = getVector(config.getTypes()[i].getName(), type, batchRecordCount); + final MaterializedField field = getVector(config.getTypes()[i].getName(), type); final Class<? extends ValueVector> vvClass = TypeHelper.getValueVectorClass(field.getType().getMinorType(), field.getDataMode()); valueVectors[i] = output.addField(field, vvClass); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java index 8f89eff..01a102d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.mock; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; @@ -32,17 +32,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP> { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); - @Override - public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); final List<MockScanEntry> entries = config.getReadEntries(); final List<RecordReader> readers = Lists.newArrayList(); for(final MockTableDef.MockScanEntry e : entries) { if ( e.isExtended( ) ) { - readers.add(new ExtendedMockRecordReader(context, e)); + readers.add(new ExtendedMockRecordReader(e)); } else { readers.add(new MockRecordReader(context, e)); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java index bc4be13..337f220 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java @@ -25,7 +25,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.visitors.AbstractExprVisitor; import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; +import org.apache.drill.exec.expr.fn.FunctionLookupContext; import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; import org.apache.drill.exec.expr.stat.RangeExprEvaluator; import org.apache.drill.exec.ops.FragmentContext; @@ -74,7 +74,7 @@ public class ParquetRGFilterEvaluator { public static boolean canDrop(LogicalExpression expr, Map<SchemaPath, ColumnStatistics> columnStatisticsMap, - long rowCount, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry) { + long rowCount, UdfUtilities udfUtilities, FunctionLookupContext functionImplementationRegistry) { ErrorCollector errorCollector = new ErrorCollectorImpl(); LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr( expr, columnStatisticsMap, errorCollector, functionImplementationRegistry); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index bb0b65f..62948b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -27,7 +27,7 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Maps; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -49,7 +49,6 @@ import org.apache.parquet.schema.Type; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; - public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); @@ -59,12 +58,12 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan @SuppressWarnings("resource") @Override - public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); OperatorContext oContext = context.newOperatorContext(rowGroupScan); - final ColumnExplorer columnExplorer = new ColumnExplorer(context, rowGroupScan.getColumns()); + final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns()); if (!columnExplorer.isStarQuery()) { rowGroupScan = new ParquetRowGroupScan(rowGroupScan.getUserName(), rowGroupScan.getStorageEngine(), @@ -154,7 +153,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan map.putAll(Maps.difference(map, diff).entriesOnlyOnRight()); } - return new ScanBatch(rowGroupScan, context, oContext, readers, implicitColumns); + return new ScanBatch(context, oContext, readers, implicitColumns); } private static boolean isComplex(ParquetMetadata footer) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java index 79c5709..cfb65d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.parquet; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.record.RecordBatch; @@ -29,7 +29,7 @@ public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class); @Override - public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children) + public WriterRecordBatch getBatch(ExecutorFragmentContext context, ParquetWriter config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java index ad1c4bf..04e76b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetToDrillTypeConverter.java @@ -38,8 +38,7 @@ public class ParquetToDrillTypeConverter { } private static TypeProtos.MinorType getMinorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, - ConvertedType convertedType, int precision, int scale, - OptionManager options) { + ConvertedType convertedType, int precision, OptionManager options) { switch (primitiveTypeName) { @@ -139,7 +138,7 @@ public class ParquetToDrillTypeConverter { public static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length, TypeProtos.DataMode mode, ConvertedType convertedType, int precision, int scale, OptionManager options) { - MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, scale, options); + MinorType minorType = getMinorType(primitiveTypeName, length, convertedType, precision, options); TypeProtos.MajorType.Builder typeBuilder = TypeProtos.MajorType.newBuilder().setMinorType(minorType).setMode(mode); if (CoreDecimalUtility.isDecimalType(minorType)) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java index 6294655..e302e7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/stat/ParquetFooterStatCollector.java @@ -22,8 +22,6 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.ParquetOutputRecordWriter; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.ParquetToDrillTypeConverter; import org.apache.parquet.column.ColumnDescriptor; @@ -37,7 +35,6 @@ import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.joda.time.DateTimeConstants; -import org.joda.time.DateTimeUtils; import java.util.ArrayList; import java.util.HashMap; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java index 15a56a2..554db10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BitToUserConnectionIterator.java @@ -26,7 +26,7 @@ import java.util.Map.Entry; import java.util.TimeZone; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection; import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnectionConfig; import org.apache.drill.exec.rpc.user.UserSession; @@ -45,13 +45,13 @@ public class BitToUserConnectionIterator implements Iterator<Object> { private String queryingUsername; private boolean isAdmin; - public BitToUserConnectionIterator(FragmentContext context) { + public BitToUserConnectionIterator(ExecutorFragmentContext context) { queryingUsername = context.getQueryUserName(); isAdmin = hasAdminPrivileges(context); itr = iterateConnectionInfo(context); } - private boolean hasAdminPrivileges(FragmentContext context) { + private boolean hasAdminPrivileges(ExecutorFragmentContext context) { OptionManager options = context.getOptions(); if (context.isUserAuthenticationEnabled() && !ImpersonationUtil.hasAdminPrivileges( @@ -65,11 +65,10 @@ public class BitToUserConnectionIterator implements Iterator<Object> { return true; } - private Iterator<ConnectionInfo> iterateConnectionInfo(FragmentContext context) { - Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections = - context.getDrillbitContext().getUserConnections(); + private Iterator<ConnectionInfo> iterateConnectionInfo(ExecutorFragmentContext context) { + Set<Entry<BitToUserConnection, BitToUserConnectionConfig>> activeConnections = context.getUserConnections(); - String hostname = context.getIdentity().getAddress(); + String hostname = context.getEndpoint().getAddress(); List<ConnectionInfo> connectionInfos = new LinkedList<ConnectionInfo>(); for (Entry<BitToUserConnection, BitToUserConnectionConfig> connection : activeConnections) { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java index dc4e7c7..08d3706 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys; import java.util.Iterator; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; public class DrillbitIterator implements Iterator<Object> { @@ -28,9 +28,9 @@ public class DrillbitIterator implements Iterator<Object> { private Iterator<DrillbitEndpoint> endpoints; private DrillbitEndpoint current; - public DrillbitIterator(FragmentContext c) { - this.endpoints = c.getDrillbitContext().getAvailableBits().iterator(); - this.current = c.getIdentity(); + public DrillbitIterator(ExecutorFragmentContext c) { + this.endpoints = c.getBits().iterator(); + this.current = c.getEndpoint(); } public static class DrillbitInstance { http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java index e624ada..8d437a6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java @@ -25,15 +25,15 @@ import java.util.Iterator; import java.util.List; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; public class MemoryIterator implements Iterator<Object> { private boolean beforeFirst = true; - private final FragmentContext context; + private final ExecutorFragmentContext context; - public MemoryIterator(final FragmentContext context) { + public MemoryIterator(final ExecutorFragmentContext context) { this.context = context; } @@ -50,7 +50,7 @@ public class MemoryIterator implements Iterator<Object> { beforeFirst = false; final MemoryInfo memoryInfo = new MemoryInfo(); - final DrillbitEndpoint endpoint = context.getIdentity(); + final DrillbitEndpoint endpoint = context.getEndpoint(); memoryInfo.hostname = endpoint.getAddress(); memoryInfo.user_port = endpoint.getUserPort(); @@ -62,7 +62,7 @@ public class MemoryIterator implements Iterator<Object> { memoryInfo.jvm_direct_current = directBean.getMemoryUsed(); - memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory(); + memoryInfo.direct_current = context.getAllocator().getAllocatedMemory(); memoryInfo.direct_max = DrillConfig.getMaxDirectMemory(); return memoryInfo; } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java index e9282c3..eef6604 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileInfoIterator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.sys; import com.google.common.base.Function; import com.google.common.collect.Iterators; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; @@ -37,7 +37,7 @@ public class ProfileInfoIterator extends ProfileIterator { private final Iterator<ProfileInfo> itr; - public ProfileInfoIterator(FragmentContext context) { + public ProfileInfoIterator(ExecutorFragmentContext context) { super(context); itr = iterateProfileInfo(); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java index f1d2075..6112eb1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileIterator.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map.Entry; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.server.QueryProfileStoreContext; import org.apache.drill.exec.server.options.OptionManager; @@ -37,17 +37,15 @@ public abstract class ProfileIterator implements Iterator<Object> { protected final String queryingUsername; protected final boolean isAdmin; - public ProfileIterator(FragmentContext context) { + public ProfileIterator(ExecutorFragmentContext context) { //Grab profile Store Context - profileStoreContext = context - .getDrillbitContext() - .getProfileStoreContext(); + profileStoreContext = context.getProfileStoreContext(); queryingUsername = context.getQueryUserName(); isAdmin = hasAdminPrivileges(context); } - protected boolean hasAdminPrivileges(FragmentContext context) { + protected boolean hasAdminPrivileges(ExecutorFragmentContext context) { OptionManager options = context.getOptions(); if (context.isUserAuthenticationEnabled() && !ImpersonationUtil.hasAdminPrivileges( http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java index fcc2921..67f1165 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ProfileJsonIterator.java @@ -23,7 +23,7 @@ import java.util.Map.Entry; import javax.annotation.Nullable; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.serialization.InstanceSerializer; @@ -40,7 +40,7 @@ public class ProfileJsonIterator extends ProfileIterator { private final InstanceSerializer<QueryProfile> profileSerializer; private final Iterator<ProfileJson> itr; - public ProfileJsonIterator(FragmentContext context) { + public ProfileJsonIterator(ExecutorFragmentContext context) { super(context); //Holding a serializer (for JSON extract) profileSerializer = profileStoreContext. http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java index 034f70c..a49ff9a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTable.java @@ -19,7 +19,7 @@ package org.apache.drill.exec.store.sys; import java.util.Iterator; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper; /** @@ -33,90 +33,88 @@ import org.apache.drill.exec.store.sys.OptionIterator.OptionValueWrapper; public enum SystemTable { OPTION("options", false, OptionValueWrapper.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_PUBLIC); } }, OPTION_VAL("options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new ExtendedOptionIterator(context, false); } }, INTERNAL_OPTIONS("internal_options", false, OptionValueWrapper.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new OptionIterator(context, OptionIterator.Mode.SYS_SESS_INTERNAL); } }, INTERNAL_OPTIONS_VAL("internal_options_val", false, ExtendedOptionIterator.ExtendedOptionValueWrapper.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new ExtendedOptionIterator(context, true); } }, BOOT("boot", false, OptionValueWrapper.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new OptionIterator(context, OptionIterator.Mode.BOOT); } }, DRILLBITS("drillbits", false,DrillbitIterator.DrillbitInstance.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new DrillbitIterator(context); } }, VERSION("version", false, VersionIterator.VersionInfo.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new VersionIterator(); } }, MEMORY("memory", true, MemoryIterator.MemoryInfo.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new MemoryIterator(context); } }, CONNECTIONS("connections", true, BitToUserConnectionIterator.ConnectionInfo.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new BitToUserConnectionIterator(context); } }, PROFILES("profiles", false, ProfileInfoIterator.ProfileInfo.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new ProfileInfoIterator(context); } }, PROFILES_JSON("profiles_json", false, ProfileJsonIterator.ProfileJson.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new ProfileJsonIterator(context); } }, THREADS("threads", true, ThreadsIterator.ThreadsInfo.class) { @Override - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { return new ThreadsIterator(context); } }; -// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTable.class); - private final String tableName; private final boolean distributed; private final Class<?> pojoClass; @@ -127,7 +125,7 @@ public enum SystemTable { this.pojoClass = pojoClass; } - public Iterator<Object> getIterator(final FragmentContext context) { + public Iterator<Object> getIterator(final ExecutorFragmentContext context) { throw new UnsupportedOperationException(tableName + " must override this method."); } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java index ab87a4a..c0ef0d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java @@ -23,7 +23,7 @@ import java.util.List; import com.google.common.collect.ImmutableList; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; @@ -39,8 +39,8 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan, - final List<RecordBatch> children) + public ScanBatch getBatch(final ExecutorFragmentContext context, final SystemTableScan scan, + final List<RecordBatch> children) throws ExecutionSetupException { final SystemTable table = scan.getTable(); final Iterator<Object> iterator = table.getIterator(context); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java index 681119d..567d299 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java @@ -21,15 +21,15 @@ import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; import java.util.Iterator; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; public class ThreadsIterator implements Iterator<Object> { private boolean beforeFirst = true; - private final FragmentContext context; + private final ExecutorFragmentContext context; - public ThreadsIterator(final FragmentContext context) { + public ThreadsIterator(final ExecutorFragmentContext context) { this.context = context; } @@ -46,7 +46,7 @@ public class ThreadsIterator implements Iterator<Object> { beforeFirst = false; final ThreadsInfo threadsInfo = new ThreadsInfo(); - final DrillbitEndpoint endpoint = context.getIdentity(); + final DrillbitEndpoint endpoint = context.getEndpoint(); threadsInfo.hostname = endpoint.getAddress(); threadsInfo.user_port = endpoint.getUserPort(); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java index c7bdbad..cfcb9e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ControlsInjector.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.testing; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; import org.slf4j.Logger; public interface ControlsInjector { @@ -43,19 +43,6 @@ public interface ControlsInjector { void injectUnchecked(ExecutionControls executionControls, String desc); /** - * Inject (throw) an unchecked exception at this point, if the fragmentContext is not null, - * an injection is specified, and it is time for it to be thrown. - * <p/> - * <p>Implementors use this in their code at a site where they want to simulate an exception - * during testing. - * - * @param fragmentContext fragmentContext used to retrieve the controls, can be null - * @param desc the site description - * throws the exception specified by the injection, if it is time - */ - void injectUnchecked(FragmentContext fragmentContext, String desc); - - /** * Inject (throw) a checked exception at this point, if an injection is specified, and it is time * for it to be thrown. * <p/> http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java index 82646ea..78c510a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControls.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; @@ -131,6 +132,11 @@ public final class ExecutionControls { private final DrillbitEndpoint endpoint; // the current endpoint + @VisibleForTesting + public ExecutionControls(final OptionManager options) { + this(options, null); + } + public ExecutionControls(final OptionManager options, final DrillbitEndpoint endpoint) { this.endpoint = endpoint; http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java index 3a74fee..e5234f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExecutionControlsInjector.java @@ -19,16 +19,15 @@ package org.apache.drill.exec.testing; import java.util.concurrent.TimeUnit; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; import com.google.common.base.Preconditions; import org.slf4j.Logger; /** * Injects exceptions and pauses at execution time for testing. Any class that wants to simulate exceptions * or inject pauses for testing should have it's own private static instance of an injector (similar to the use - * of loggers). Injection site either use {@link org.apache.drill.exec.ops.FragmentContext} or - * {@link org.apache.drill.exec.ops.QueryContext}. See {@link org.apache.drill.exec.testing.TestExceptionInjection} and - * {@link org.apache.drill.exec.testing.TestPauseInjection} for examples of use. + * of loggers). Injection site either use {@link FragmentContextImpl} or + * {@link org.apache.drill.exec.ops.QueryContext}. * See {@link ControlsInjector} for documentation. */ public class ExecutionControlsInjector implements ControlsInjector { @@ -60,13 +59,6 @@ public class ExecutionControlsInjector implements ControlsInjector { } @Override - public void injectUnchecked(final FragmentContext fragmentContext, final String desc) { - if (fragmentContext != null) { - injectUnchecked(fragmentContext.getExecutionControls(), desc); - } - } - - @Override public <T extends Throwable> void injectChecked(final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T { Preconditions.checkNotNull(executionControls); http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java index b8ec44d..ae853d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/NoOpControlsInjector.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.testing; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; import org.slf4j.Logger; /** @@ -48,10 +48,6 @@ public final class NoOpControlsInjector implements ControlsInjector { } @Override - public void injectUnchecked(final FragmentContext fragmentContext, final String desc) { - } - - @Override public <T extends Throwable> void injectChecked( final ExecutionControls executionControls, final String desc, final Class<T> exceptionClass) throws T { } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 8a3e5b6..793cc01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -29,9 +29,7 @@ import org.apache.drill.exec.util.ArrayWrappedIntIntMap; import com.google.common.base.Preconditions; -public abstract class AbstractDataCollector implements DataCollector{ - - // private final List<MinorFragmentEndpoint> incoming; +public abstract class AbstractDataCollector implements DataCollector { private final int oppositeMajorFragmentId; private final AtomicIntegerArray remainders; private final AtomicInteger remainingRequired; @@ -42,7 +40,6 @@ public abstract class AbstractDataCollector implements DataCollector{ /** * @param parentAccounter - * @param receiver * @param numBuffers Number of RawBatchBuffer inputs required to store the incoming data * @param bufferCapacity Capacity of each RawBatchBuffer. * @param context @@ -74,7 +71,7 @@ public abstract class AbstractDataCollector implements DataCollector{ if (spooling) { buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i); } else { - buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId()); + buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity); } } } http://git-wip-us.apache.org/repos/asf/drill/blob/186536d5/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index f15a3e6..382ab1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -35,13 +35,13 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { } protected interface BufferQueue<T> { - public void addOomBatch(RawFragmentBatch batch); - public RawFragmentBatch poll() throws IOException; - public RawFragmentBatch take() throws IOException, InterruptedException; - public boolean checkForOutOfMemory(); - public int size(); - public boolean isEmpty(); - public void add(T obj); + void addOomBatch(RawFragmentBatch batch); + RawFragmentBatch poll() throws IOException; + RawFragmentBatch take() throws IOException, InterruptedException; + boolean checkForOutOfMemory(); + int size(); + boolean isEmpty(); + void add(T obj); } protected BufferQueue<T> bufferQueue; @@ -74,7 +74,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to // ensure that tests run. - if (context != null && !context.shouldContinue()) { + if (context != null && !context.getExecutorState().shouldContinue()) { this.kill(context); } @@ -102,14 +102,14 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { // ## Add assertion that all acks have been sent. TODO @Override public void close() { - if (!isTerminated() && context.shouldContinue()) { + if (!isTerminated() && context.getExecutorState().shouldContinue()) { final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount); throw new IllegalStateException(msg); } if (!bufferQueue.isEmpty()) { - if (context.shouldContinue()) { - context.fail(new IllegalStateException("Batches still in queue during cleanup")); + if (context.getExecutorState().shouldContinue()) { + context.getExecutorState().fail(new IllegalStateException("Batches still in queue during cleanup")); logger.error("{} Batches in queue.", bufferQueue.size()); } clearBufferWithBody(); @@ -133,7 +133,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { batch = bufferQueue.poll(); assertAckSent(batch); } catch (IOException e) { - context.fail(e); + context.getExecutorState().fail(e); continue; } if (batch.getBody() != null) { @@ -172,7 +172,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { } catch (final InterruptedException e) { // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { kill(context); } else { throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e); @@ -184,7 +184,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { return null; } - if (context.isOverMemoryLimit()) { + if (context.getAllocator().isOverLimit()) { outOfMemory.set(true); }