DRILL-6153: Operator framework closes #1121
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69a5f3a9 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69a5f3a9 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69a5f3a9 Branch: refs/heads/master Commit: 69a5f3a9c4fadafc588a3e325a12b98cbf359ece Parents: 4ee207b Author: Paul Rogers <prog...@cloudera.com> Authored: Mon Feb 12 22:27:23 2018 -0800 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Sat Mar 3 19:47:53 2018 +0200 ---------------------------------------------------------------------- .../physical/impl/protocol/BatchAccessor.java | 50 ++ .../physical/impl/protocol/OperatorDriver.java | 234 ++++++ .../physical/impl/protocol/OperatorExec.java | 127 ++++ .../impl/protocol/OperatorRecordBatch.java | 156 ++++ .../physical/impl/protocol/SchemaTracker.java | 98 +++ .../impl/protocol/VectorContainerAccessor.java | 134 ++++ .../physical/impl/protocol/package-info.java | 29 + .../impl/protocol/TestOperatorRecordBatch.java | 747 +++++++++++++++++++ 8 files changed, 1575 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java new file mode 100644 index 0000000..b22353f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/BatchAccessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import java.util.Iterator; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +/** + * Provides access to the row set (record batch) produced by an + * operator. Previously, a record batch <i>was</i> an operator. + * In this version, the row set is a service of the operator rather + * than being part of the operator. + */ + +public interface BatchAccessor { + BatchSchema getSchema(); + int schemaVersion(); + int getRowCount(); + VectorContainer getOutgoingContainer(); + TypedFieldId getValueVectorId(SchemaPath path); + VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids); + WritableBatch getWritableBatch(); + SelectionVector2 getSelectionVector2(); + SelectionVector4 getSelectionVector4(); + Iterator<VectorWrapper<?>> iterator(); + void release(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java new file mode 100644 index 0000000..9e6190c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; + +/** + * State machine that drives the operator executable. Converts + * between the iterator protocol and the operator executable protocol. + * Implemented as a separate class in anticipation of eventually + * changing the record batch (iterator) protocol. + */ + +public class OperatorDriver { + public enum State { + + /** + * Before the first call to next(). + */ + + START, + + /** + * The first call to next() has been made and schema (only) + * was returned. On the subsequent call to next(), return any + * data that might have accompanied that first batch. + */ + + SCHEMA, + + /** + * The second call to next() has been made and there is more + * data to deliver on subsequent calls. + */ + + RUN, + + /** + * No more data to deliver. + */ + + END, + + /** + * An error occurred. + */ + + FAILED, + + /** + * Operation was cancelled. No more batches will be returned, + * but close() has not yet been called. + */ + + CANCELED, + + /** + * close() called and resources are released. No more batches + * will be returned, but close() has not yet been called. + * (This state is semantically identical to FAILED, it exists just + * in case an implementation needs to know the difference between the + * END, FAILED and CANCELED states.) + */ + + CLOSED + } + + private OperatorDriver.State state = State.START; + + /** + * Operator context. The driver "owns" the context and is responsible + * for closing it. + */ + + private final OperatorContext opContext; + private final OperatorExec operatorExec; + private final BatchAccessor batchAccessor; + private int schemaVersion; + + public OperatorDriver(OperatorContext opContext, OperatorExec opExec) { + this.opContext = opContext; + this.operatorExec = opExec; + batchAccessor = operatorExec.batchAccessor(); + } + + /** + * Get the next batch. Performs initialization on the first call. + * @return the iteration outcome to send downstream + */ + + public IterOutcome next() { + try { + switch (state) { + case START: + return start(); + case RUN: + return doNext(); + default: + OperatorRecordBatch.logger.debug("Extra call to next() in state " + state + ": " + operatorLabel()); + return IterOutcome.NONE; + } + } catch (UserException e) { + cancelSilently(); + state = State.FAILED; + throw e; + } catch (Throwable t) { + cancelSilently(); + state = State.FAILED; + throw UserException.executionError(t) + .addContext("Exception thrown from", operatorLabel()) + .build(OperatorRecordBatch.logger); + } + } + + /** + * Cancels the operator before reaching EOF. + */ + + public void cancel() { + try { + switch (state) { + case START: + case RUN: + cancelSilently(); + break; + default: + break; + } + } finally { + state = State.CANCELED; + } + } + + /** + * Start the operator executor. Bind it to the various contexts. + * Then start the executor and fetch the first schema. + * @return result of the first batch, which should contain + * only a schema, or EOF + */ + + private IterOutcome start() { + state = State.SCHEMA; + if (operatorExec.buildSchema()) { + schemaVersion = batchAccessor.schemaVersion(); + state = State.RUN; + return IterOutcome.OK_NEW_SCHEMA; + } else { + state = State.END; + return IterOutcome.NONE; + } + } + + /** + * Fetch a record batch, detecting EOF and a new schema. + * @return the <tt>IterOutcome</tt> for the above cases + */ + + private IterOutcome doNext() { + if (! operatorExec.next()) { + state = State.END; + return IterOutcome.NONE; + } + int newVersion = batchAccessor.schemaVersion(); + if (newVersion != schemaVersion) { + schemaVersion = newVersion; + return IterOutcome.OK_NEW_SCHEMA; + } + return IterOutcome.OK; + } + + /** + * Implement a cancellation, and ignore any exception that is + * thrown. We're already in trouble here, no need to keep track + * of additional things that go wrong. + */ + + private void cancelSilently() { + try { + if (state == State.SCHEMA || state == State.RUN) { + operatorExec.cancel(); + } + } catch (Throwable t) { + // Ignore; we're already in a bad state. + OperatorRecordBatch.logger.error("Exception thrown from cancel() for " + operatorLabel(), t); + } + } + + private String operatorLabel() { + return operatorExec.getClass().getCanonicalName(); + } + + public void close() { + if (state == State.CLOSED) { + return; + } + try { + operatorExec.close(); + } catch (UserException e) { + throw e; + } catch (Throwable t) { + throw UserException.executionError(t) + .addContext("Exception thrown from", operatorLabel()) + .build(OperatorRecordBatch.logger); + } finally { + opContext.close(); + state = State.CLOSED; + } + } + + public BatchAccessor batchAccessor() { + return batchAccessor; + } + + public OperatorContext operatorContext() { + return opContext; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java new file mode 100644 index 0000000..57a8cf3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorExec.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import org.apache.drill.exec.ops.OperatorContext; + +/** + * Core protocol for a Drill operator execution. + * + * <h4>Lifecycle</h4> + * + * <ul> + * <li>Creation via an operator-specific constructor in the + * corresponding <tt>RecordBatchCreator</tt>.</li> + * <li><tt>bind()</tt> called to provide the operator services.</li> + * <li><tt>buildSchema()</tt> called to define the schema before + * fetching the first record batch.</li> + * <li><tt>next()</tt> called repeatedly to prepare each new record + * batch until EOF or until cancellation.</li> + * <li><tt>cancel()</tt> called if the operator should quit early.</li> + * <li><tt>close()</tt> called to release resources. Note that + * <tt>close()</tt> is called in response to:<ul> + * <li>EOF</li> + * <li>After <tt>cancel()</tt></li> + * <li>After an exception is thrown.</li></ul></li> + * </ul> + * + * <h4>Error Handling</h4> + * + * Any method can throw an (unchecked) exception. (Drill does not use + * checked exceptions.) Preferably, the code will throw a + * <tt>UserException</tt> that explains the error to the user. If any + * other kind of exception is thrown, then the enclosing class wraps it + * in a generic <tt>UserException</tt> that indicates that "something went + * wrong", which is less than ideal. + * + * <h4>Result Set</h4> + * The operator "publishes" a result set in response to returning + * <tt>true</tt> from <tt>next()</tt> by populating a + * {@link BatchAccesor} provided via {@link #batchAccessor()}. For + * compatibility with other Drill operators, the set of vectors within + * the batch must be the same from one batch to the next. + */ + +public interface OperatorExec { + + /** + * Bind this operator to the context. The context provides access + * to per-operator, per-fragment and per-Drillbit services. + * Also provides access to the operator definition (AKA "pop + * config") for this operator. + * + * @param context operator context + */ + + public void bind(OperatorContext context); + + /** + * Provides a generic access mechanism to the batch's output data. + * This method is called after a successful return from + * {@link #buildSchema()} and {@link #next()}. The batch itself + * can be held in a standard {@link VectorContainer}, or in some + * other structure more convenient for this operator. + * + * @return the access for the batch's output container + */ + + BatchAccessor batchAccessor(); + + /** + * Retrieves the schema of the batch before the first actual batch + * of data. The schema is returned via an empty batch (no rows, + * only schema) from {@link #batchAccessor()}. + * + * @return true if a schema is available, false if the operator + * reached EOF before a schema was found + */ + + boolean buildSchema(); + + /** + * Retrieves the next batch of data. The data is returned via + * the {@link #batchAccessor()} method. + * + * @return true if another batch of data is available, false if + * EOF was reached and no more data is available + */ + + boolean next(); + + /** + * Alerts the operator that the query was cancelled. Generally + * optional, but allows the operator to realize that a cancellation + * was requested. + */ + + void cancel(); + + /** + * Close the operator by releasing all resources that the operator + * held. Called after {@link #cancel()} and after {@link #batchAccessor()} + * or {@link #next()} returns false. + * <p> + * Note that there may be a significant delay between the last call to + * <tt>next()</tt> and the call to <tt>close()</tt> during which + * downstream operators do their work. A tidy operator will release + * resources immediately after EOF to avoid holding onto memory or other + * resources that could be used by downstream operators. + */ + + void close(); +} http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java new file mode 100644 index 0000000..4f0cff8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorRecordBatch.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import java.util.Iterator; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +/** + * Modular implementation of the standard Drill record batch iterator + * protocol. The protocol has two parts: control of the operator and + * access to the record batch. Each is encapsulated in separate + * implementation classes to allow easier customization for each + * situation. The operator internals are, themselves, abstracted to + * yet another class with the steps represented as method calls rather + * than as internal states as in the record batch iterator protocol. + * <p> + * Note that downstream operators make an assumption that the + * same vectors will appear from one batch to the next. That is, + * not only must the schema be the same, but if column "a" appears + * in two batches, the same value vector must back "a" in both + * batches. The <tt>TransferPair</tt> abstraction fails if different + * vectors appear across batches. + */ + +public class OperatorRecordBatch implements CloseableRecordBatch { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorRecordBatch.class); + + private final OperatorDriver driver; + private final BatchAccessor batchAccessor; + + public OperatorRecordBatch(FragmentContext context, PhysicalOperator config, OperatorExec opExec) { + OperatorContext opContext = context.newOperatorContext(config); + opContext.getStats().startProcessing(); + + // Chicken-and-egg binding: the two objects must know about each other. Pass the + // context to the operator exec via a bind method. + + try { + opExec.bind(opContext); + driver = new OperatorDriver(opContext, opExec); + batchAccessor = opExec.batchAccessor(); + } catch (UserException e) { + opContext.close(); + throw e; + } catch (Throwable t) { + opContext.close(); + throw UserException.executionError(t) + .addContext("Exception thrown from", opExec.getClass().getSimpleName() + ".bind()") + .build(logger); + } + finally { + opContext.getStats().stopProcessing(); + } + } + + @Override + public FragmentContext getContext() { + return fragmentContext(); + } + + // No longer needed, can be removed after all + // batch size control work is committed. + + public FragmentContext fragmentContext() { + return driver.operatorContext().getFragmentContext(); + } + + @Override + public BatchSchema getSchema() { return batchAccessor.getSchema(); } + + @Override + public int getRecordCount() { return batchAccessor.getRowCount(); } + + @Override + public VectorContainer getOutgoingContainer() { + return batchAccessor.getOutgoingContainer(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return batchAccessor.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return batchAccessor.getValueAccessorById(clazz, ids); + } + + @Override + public WritableBatch getWritableBatch() { + return batchAccessor.getWritableBatch(); + } + + @Override + public SelectionVector2 getSelectionVector2() { + return batchAccessor.getSelectionVector2(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + return batchAccessor.getSelectionVector4(); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + return batchAccessor.iterator(); + } + + @Override + public void kill(boolean sendUpstream) { + driver.cancel(); + } + + @Override + public IterOutcome next() { + try { + driver.operatorContext().getStats().startProcessing(); + return driver.next(); + } finally { + driver.operatorContext().getStats().stopProcessing(); + } + } + + @Override + public void close() { + driver.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java new file mode 100644 index 0000000..cd7c296 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/SchemaTracker.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; + +/** + * Tracks changes to schemas via "snapshots" over time. That is, given + * a schema, tracks if a new schema is the same as the current one. For + * example, each batch output from a series of readers might be compared, + * as they are returned, to detect schema changes from one batch to the + * next. This class does not track vector-by-vector changes as a schema + * is built, but rather periodic "snapshots" at times determined by the + * operator. + * <p> + * If an operator is guaranteed to emit a consistent schema, then no + * checks need be done, and this tracker will report no schema change. + * On the other hand, a scanner might check schema more often. At least + * once per reader, and more often if a reader is "late-schema": if the + * reader can change schema batch-by-batch. + * <p> + * Drill defines "schema change" in a very specific way. Not only must + * the set of columns be the same, and have the same types, it must also + * be the case that the <b>vectors</b> that hold the columns be identical. + * Generated code contains references to specific vector objects; passing + * along different vectors requires new code to be generated and is treated + * as a schema change. + * <p> + * Drill has no concept of "same schema, different vectors." A change in + * vector is just as serious as a change in schema. Hence, operators + * try to use the same vectors for their entire lives. That is the change + * tracked here. + */ + +// TODO: Does not handle SV4 situations + +public class SchemaTracker { + + private int schemaVersion; + private BatchSchema currentSchema; + private List<ValueVector> currentVectors = new ArrayList<>(); + + public void trackSchema(VectorContainer newBatch) { + + if (! isSameSchema(newBatch)) { + schemaVersion++; + captureSchema(newBatch); + } + } + + private boolean isSameSchema(VectorContainer newBatch) { + if (currentVectors.size() != newBatch.getNumberOfColumns()) { + return false; + } + + // Compare vectors by identity: not just same type, + // must be same instance. + + for (int i = 0; i < currentVectors.size(); i++) { + if (currentVectors.get(i) != newBatch.getValueVector(i).getValueVector()) { + return false; + } + } + return true; + } + + private void captureSchema(VectorContainer newBatch) { + currentVectors.clear(); + for (VectorWrapper<?> vw : newBatch) { + currentVectors.add(vw.getValueVector()); + } + currentSchema = newBatch.getSchema(); + } + + public int schemaVersion() { return schemaVersion; } + public BatchSchema schema() { return currentSchema; } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java new file mode 100644 index 0000000..e2d78d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/VectorContainerAccessor.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import java.util.Collections; +import java.util.Iterator; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +public class VectorContainerAccessor implements BatchAccessor { + + public static class ContainerAndSv2Accessor extends VectorContainerAccessor { + + private SelectionVector2 sv2; + + public void setSelectionVector(SelectionVector2 sv2) { + this.sv2 = sv2; + } + + @Override + public SelectionVector2 getSelectionVector2() { + return sv2; + } + } + + public static class ContainerAndSv4Accessor extends VectorContainerAccessor { + + private SelectionVector4 sv4; + + @Override + public SelectionVector4 getSelectionVector4() { + return sv4; + } + } + + private VectorContainer container; + private SchemaTracker schemaTracker = new SchemaTracker(); + + /** + * Set the vector container. Done initially, and any time the schema of + * the container may have changed. May be called with the same container + * as the previous call, or a different one. A schema change occurs + * unless the vectors are identical across the two containers. + * + * @param container the container that holds vectors to be sent + * downstream + */ + + public void setContainer(VectorContainer container) { + this.container = container; + if (container != null) { + schemaTracker.trackSchema(container); + } + } + + @Override + public BatchSchema getSchema() { + return container == null ? null : container.getSchema(); + } + + @Override + public int schemaVersion() { return schemaTracker.schemaVersion(); } + + @Override + public int getRowCount() { + return container == null ? 0 : container.getRecordCount(); + } + + @Override + public VectorContainer getOutgoingContainer() { return container; } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); + } + + @Override + public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); + } + + @Override + public WritableBatch getWritableBatch() { + return WritableBatch.get(container); + } + + @Override + public SelectionVector2 getSelectionVector2() { + // Throws an exception by default because containers + // do not support selection vectors. + return container.getSelectionVector2(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + // Throws an exception by default because containers + // do not support selection vectors. + return container.getSelectionVector4(); + } + + @Override + public Iterator<VectorWrapper<?>> iterator() { + if (container == null) { + return Collections.emptyIterator(); + } else { + return container.iterator(); + } + } + + @Override + public void release() { container.zeroVectors(); } +} http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java new file mode 100644 index 0000000..11af47c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/package-info.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines a revised implementation of the Drill RecordBatch protocol. This + * version separates concerns into specific classes, and creates as single + * "shim" class to implement the iterator protocol, deferring to specific + * classes as needed. + * <p> + * This version is an eventual successor to the original implementation which + * used the "kitchen sink" pattern to combine all functionality into s single, + * large record batch implementation. + */ + +package org.apache.drill.exec.physical.impl.protocol; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/69a5f3a9/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java new file mode 100644 index 0000000..19946dd --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/protocol/TestOperatorRecordBatch.java @@ -0,0 +1,747 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.protocol; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Iterator; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor.ContainerAndSv2Accessor; +import org.apache.drill.exec.proto.UserBitShared.NamePart; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.IntVector; +import org.apache.drill.exec.vector.VarCharVector; +import org.apache.drill.test.SubOperatorTest; +import org.apache.drill.test.rowSet.RowSet.SingleRowSet; +import org.apache.drill.test.rowSet.SchemaBuilder; +import org.junit.Test; + +/** + * Test the implementation of the Drill Volcano iterator protocol that + * wraps the modular operator implementation. + */ + +public class TestOperatorRecordBatch extends SubOperatorTest { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubOperatorTest.class); + + /** + * Mock operator executor that simply tracks each method call + * and provides a light-weight vector container. Returns a + * defined number of (batches) with an optional schema change. + */ + + private class MockOperatorExec implements OperatorExec { + + public boolean bindCalled; + public boolean buildSchemaCalled; + public int nextCalls = 1; + public int nextCount; + public int schemaChangeAt = -1; + public boolean cancelCalled; + public boolean closeCalled; + public boolean schemaEOF; + private final VectorContainerAccessor batchAccessor; + + public MockOperatorExec() { + this(mockBatch()); + } + + public MockOperatorExec(VectorContainer container) { + batchAccessor = new VectorContainerAccessor(); + batchAccessor.setContainer(container); + } + + public MockOperatorExec(VectorContainerAccessor accessor) { + batchAccessor = accessor; + } + + @Override + public void bind(OperatorContext context) { bindCalled = true; } + + @Override + public BatchAccessor batchAccessor() { + return batchAccessor; + } + + @Override + public boolean buildSchema() { buildSchemaCalled = true; return ! schemaEOF; } + + @Override + public boolean next() { + nextCount++; + if (nextCount > nextCalls) { + return false; + } + if (nextCount == schemaChangeAt) { + BatchSchema newSchema = new SchemaBuilder(batchAccessor.getSchema()) + .add("b", MinorType.VARCHAR) + .build(); + VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema); + batchAccessor.setContainer(newContainer); + } + return true; + } + + @Override + public void cancel() { cancelCalled = true; } + + @Override + public void close() { + batchAccessor().getOutgoingContainer().clear(); + closeCalled = true; + } + } + + private static VectorContainer mockBatch() { + VectorContainer container = new VectorContainer(fixture.allocator(), new SchemaBuilder() + .add("a", MinorType.INT) + .build()); + container.buildSchema(SelectionVectorMode.NONE); + return container; + } + + private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) { + // Dummy operator definition + PhysicalOperator popConfig = new Limit(null, 0, 100); + return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec); + } + + /** + * Simulate a normal run: return some batches, encounter a schema change. + */ + + @Test + public void testNormalLifeCycle() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 2; + opExec.schemaChangeAt = 2; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + + assertSame(fixture.getFragmentContext(), opBatch.fragmentContext()); + assertNotNull(opBatch.getContext()); + + // First call to next() builds schema + + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertTrue(opExec.bindCalled); + assertTrue(opExec.buildSchemaCalled); + assertEquals(0, opExec.nextCount); + + // Second call returns the first batch + + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(1, opExec.nextCount); + + // Third call causes a schema change + + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(2, opExec.nextCount); + + // Fourth call reaches EOF + + assertEquals(IterOutcome.NONE, opBatch.next()); + assertEquals(3, opExec.nextCount); + + // Close + } catch (Exception e) { + fail(); + } + + assertTrue(opExec.closeCalled); + assertFalse(opExec.cancelCalled); + } + + /** + * Simulate a truncated life cycle: next() is never called. Not a valid part + * of the protocol; but should be ready anyway. + */ + + @Test + public void testTruncatedLifeCycle() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.schemaEOF = true; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + } catch (Exception e) { + fail(); + } + assertTrue(opExec.bindCalled); + assertTrue(opExec.closeCalled); + } + + /** + * Simulate reaching EOF when trying to create the schema. + */ + + @Test + public void testSchemaEOF() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.schemaEOF = true; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.NONE, opBatch.next()); + assertTrue(opExec.buildSchemaCalled); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + } + + /** + * Simulate reaching EOF on the first batch. This simulated data source + * discovered a schema, but had no data. + */ + + @Test + public void testFirstBatchEOF() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 0; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertTrue(opExec.buildSchemaCalled); + assertEquals(IterOutcome.NONE, opBatch.next()); + assertEquals(1, opExec.nextCount); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + } + + /** + * Simulate the caller failing the operator before getting the schema. + */ + + @Test + public void testFailEarly() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 2; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + opBatch.kill(false); + assertFalse(opExec.buildSchemaCalled); + assertEquals(0, opExec.nextCount); + assertFalse(opExec.cancelCalled); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + } + + /** + * Simulate the caller failing the operator before EOF. + */ + + @Test + public void testFailWhileReading() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 2; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + opBatch.kill(false); + assertTrue(opExec.cancelCalled); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + } + + /** + * Simulate the caller failing the operator after EOF but before close. + * This is a silly time to fail, but have to handle it anyway. + */ + + @Test + public void testFailBeforeClose() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 2; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.NONE, opBatch.next()); + opBatch.kill(false); + + // Already hit EOF, so fail won't be passed along. + + assertFalse(opExec.cancelCalled); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + } + + /** + * Simulate the caller failing the operator after close. + * This is violates the operator protocol, but have to handle it anyway. + */ + + @Test + public void testFailAfterClose() { + MockOperatorExec opExec = new MockOperatorExec(); + opExec.nextCalls = 2; + + OperatorRecordBatch opBatch = makeOpBatch(opExec); + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.NONE, opBatch.next()); + try { + opBatch.close(); + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + opBatch.kill(false); + assertFalse(opExec.cancelCalled); + } + + /** + * The record batch abstraction has a bunch of methods to work with a vector container. + * Rather than simply exposing the container itself, the batch instead exposes various + * container operations. Probably an artifact of its history. In any event, make + * sure those methods are passed through to the container accessor. + */ + + @Test + public void testBatchAccessor() { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .build(); + SingleRowSet rs = fixture.rowSetBuilder(schema) + .addRow(10, "fred") + .addRow(20, "wilma") + .build(); + MockOperatorExec opExec = new MockOperatorExec(rs.container()); + opExec.nextCalls = 1; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(schema, opBatch.getSchema()); + assertEquals(2, opBatch.getRecordCount()); + assertSame(rs.container(), opBatch.getOutgoingContainer()); + + Iterator<VectorWrapper<?>> iter = opBatch.iterator(); + assertEquals("a", iter.next().getValueVector().getField().getName()); + assertEquals("b", iter.next().getValueVector().getField().getName()); + + // Not a full test of the schema path; just make sure that the + // pass-through to the Vector Container works. + + SchemaPath path = SchemaPath.create(NamePart.newBuilder().setName("a").build()); + TypedFieldId id = opBatch.getValueVectorId(path); + assertEquals(MinorType.INT, id.getFinalType().getMinorType()); + assertEquals(1, id.getFieldIds().length); + assertEquals(0, id.getFieldIds()[0]); + + path = SchemaPath.create(NamePart.newBuilder().setName("b").build()); + id = opBatch.getValueVectorId(path); + assertEquals(MinorType.VARCHAR, id.getFinalType().getMinorType()); + assertEquals(1, id.getFieldIds().length); + assertEquals(1, id.getFieldIds()[0]); + + // Sanity check of getValueAccessorById() + + VectorWrapper<?> w = opBatch.getValueAccessorById(IntVector.class, 0); + assertNotNull(w); + assertEquals("a", w.getValueVector().getField().getName()); + w = opBatch.getValueAccessorById(VarCharVector.class, 1); + assertNotNull(w); + assertEquals("b", w.getValueVector().getField().getName()); + + // getWritableBatch() ? + + // No selection vectors + + try { + opBatch.getSelectionVector2(); + fail(); + } catch (UnsupportedOperationException e) { + // Expected + } + try { + opBatch.getSelectionVector4(); + fail(); + } catch (UnsupportedOperationException e) { + // Expected + } + + } catch (Exception e) { + fail(e.getMessage()); + } + assertTrue(opExec.closeCalled); + } + + @Test + public void testSchemaChange() { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .build(); + SingleRowSet rs = fixture.rowSetBuilder(schema) + .addRow(10, "fred") + .addRow(20, "wilma") + .build(); + VectorContainer container = rs.container(); + MockOperatorExec opExec = new MockOperatorExec(container); + int schemaVersion = opExec.batchAccessor().schemaVersion(); + + // Be tidy: start at 1. + + assertEquals(1, schemaVersion); + + // Changing data does not trigger schema change + + container.zeroVectors(); + opExec.batchAccessor.setContainer(container); + assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion()); + + // Different container, same vectors, does not trigger a change + + VectorContainer c2 = new VectorContainer(fixture.allocator()); + for (VectorWrapper<?> vw : container) { + c2.add(vw.getValueVector()); + } + c2.buildSchema(SelectionVectorMode.NONE); + opExec.batchAccessor.setContainer(c2); + assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion()); + + opExec.batchAccessor.setContainer(container); + assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion()); + + // Replacing a vector with another of the same type does trigger + // a change. + + VectorContainer c3 = new VectorContainer(fixture.allocator()); + c3.add(container.getValueVector(0).getValueVector()); + c3.add(TypeHelper.getNewVector( + container.getValueVector(1).getValueVector().getField(), + fixture.allocator(), null)); + c3.buildSchema(SelectionVectorMode.NONE); + opExec.batchAccessor.setContainer(c3); + assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion()); + schemaVersion = opExec.batchAccessor().schemaVersion(); + + // No change if same schema again + + opExec.batchAccessor.setContainer(c3); + assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion()); + + // Adding a vector triggers a change + + MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL); + c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null)); + c3.buildSchema(SelectionVectorMode.NONE); + opExec.batchAccessor.setContainer(c3); + assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion()); + schemaVersion = opExec.batchAccessor().schemaVersion(); + + // No change if same schema again + + opExec.batchAccessor.setContainer(c3); + assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion()); + + // Removing a vector triggers a change + + c3.remove(c3.getValueVector(2).getValueVector()); + c3.buildSchema(SelectionVectorMode.NONE); + assertEquals(2, c3.getNumberOfColumns()); + opExec.batchAccessor.setContainer(c3); + assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion()); + schemaVersion = opExec.batchAccessor().schemaVersion(); + + // Clean up + + opExec.close(); + c2.clear(); + c3.clear(); + } + + /** + * Test that an SV2 is properly handled by the proper container accessor. + */ + + @Test + public void testSv2() { + BatchSchema schema = new SchemaBuilder() + .add("a", MinorType.INT) + .add("b", MinorType.VARCHAR) + .build(); + SingleRowSet rs = fixture.rowSetBuilder(schema) + .addRow(10, "fred") + .addRow(20, "wilma") + .withSv2() + .build(); + + ContainerAndSv2Accessor accessor = new ContainerAndSv2Accessor(); + accessor.setContainer(rs.container()); + accessor.setSelectionVector(rs.getSv2()); + + MockOperatorExec opExec = new MockOperatorExec(accessor); + opExec.nextCalls = 1; + + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertSame(rs.getSv2(), opBatch.getSelectionVector2()); + + } catch (Exception e) { + fail(); + } + assertTrue(opExec.closeCalled); + + // Must release SV2 + + rs.clear(); + } + + //----------------------------------------------------------------------- + // Exception error cases + // + // Assumes that any of the operator executor methods could throw an + // exception. A wise implementation will throw a user exception that the + // operator just passes along. A lazy implementation will throw any old + // unchecked exception. Validate both cases. + + public static final String ERROR_MSG = "My Bad!"; + + /** + * Failure on the bind method. + */ + + @Test + public void testWrappedExceptionOnBind() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public void bind(OperatorContext context) { + throw new IllegalStateException(ERROR_MSG); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Throwable t) { + fail(); + } + assertFalse(opExec.cancelCalled); // Cancel not called: too early in life + assertFalse(opExec.closeCalled); // Same with close + } + + @Test + public void testUserExceptionOnBind() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public void bind(OperatorContext context) { + throw UserException.connectionError() + .message(ERROR_MSG) + .build(logger); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + opBatch.next(); + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertNull(e.getCause()); + } catch (Throwable t) { + fail(); + } + assertFalse(opExec.cancelCalled); // Cancel not called: too early in life + assertFalse(opExec.closeCalled); // Same with close + } + + /** + * Failure when building the schema (first call to next()). + */ + @Test + public void testWrappedExceptionOnBuildSchema() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public boolean buildSchema() { + throw new IllegalStateException(ERROR_MSG); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + opBatch.next(); + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Throwable t) { + fail(); + } + assertTrue(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } + + @Test + public void testUserExceptionOnBuildSchema() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public boolean buildSchema() { + throw UserException.dataReadError() + .message(ERROR_MSG) + .build(logger); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + opBatch.next(); + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertNull(e.getCause()); + } catch (Throwable t) { + fail(); + } + assertTrue(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } + + /** + * Failure on the second or subsequent calls to next(), when actually + * fetching a record batch. + */ + + @Test + public void testWrappedExceptionOnNext() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public boolean next() { + throw new IllegalStateException(ERROR_MSG); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + opBatch.next(); + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Throwable t) { + fail(); + } + assertTrue(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } + + @Test + public void testUserExceptionOnNext() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public boolean next() { + throw UserException.dataReadError() + .message(ERROR_MSG) + .build(logger); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + opBatch.next(); + fail(); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertNull(e.getCause()); + } catch (Throwable t) { + fail(); + } + assertTrue(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } + + /** + * Failure when closing the operator implementation. + */ + + @Test + public void testWrappedExceptionOnClose() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public void close() { + // Release memory + super.close(); + // Then fail + throw new IllegalStateException(ERROR_MSG); + } + }; + opExec.nextCalls = 1; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.NONE, opBatch.next()); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertTrue(e.getCause() instanceof IllegalStateException); + } catch (Throwable t) { + fail(); + } + assertFalse(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } + + @Test + public void testUserExceptionOnClose() { + MockOperatorExec opExec = new MockOperatorExec() { + @Override + public void close() { + // Release memory + super.close(); + // Then fail + throw UserException.dataReadError() + .message(ERROR_MSG) + .build(logger); + } + }; + try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) { + assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next()); + assertEquals(IterOutcome.OK, opBatch.next()); + assertEquals(IterOutcome.NONE, opBatch.next()); + } catch (UserException e) { + assertTrue(e.getMessage().contains(ERROR_MSG)); + assertNull(e.getCause()); + } catch (Throwable t) { + fail(); + } + assertFalse(opExec.cancelCalled); + assertTrue(opExec.closeCalled); + } +}