paul-rogers commented on a change in pull request #1618: DRILL-6950: Row 
set-based scan framework
URL: https://github.com/apache/drill/pull/1618#discussion_r251218444
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ScanOperatorExec.java
 ##########
 @@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.protocol.BatchAccessor;
+import org.apache.drill.exec.physical.impl.protocol.OperatorExec;
+import org.apache.drill.exec.physical.impl.protocol.VectorContainerAccessor;
+
+import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Implementation of the revised scan operator that uses a mutator aware of
+ * batch sizes. This is the successor to {@link ScanBatch} and should be used
+ * by all new scan implementations.
+ *
+ * <h4>Scanner Framework</h4>
+ *
+ * Acts as an adapter between the operator protocol and the row reader
+ * protocol.
+ * <p>
+ * The scan operator itself is simply a framework for handling a set of 
readers;
+ * it knows nothing other than the interfaces of the components it works with;
+ * delegating all knowledge of schemas, projection, reading and the like to
+ * implementations of those interfaces. Because that work is complex, a set
+ * of frameworks exist to handle most common use cases, but a specialized 
reader
+ * can create a framework or reader from scratch.
+ * <p>
+ * Error handling in this class is minimal: the enclosing record batch iterator
+ * is responsible for handling exceptions. Error handling relies on the fact
+ * that the iterator will call <tt>close()</tt> regardless of which exceptions
+ * are thrown.
+ *
+ * <h4>Protocol</h4>
+ *
+ * The scanner works directly with two other interfaces
+ * <p>
+ * The {@link ScanOperatorEvents} implementation provides the set of readers to
+ * use. This class can simply maintain a list, or can create the reader on
+ * demand.
+ * <p>
+ * More subtly, the factory also handles projection issues and manages vectors
+ * across subsequent readers. A number of factories are available for the most
+ * common cases. Extend these to implement a version specific to a data source.
+ * <p>
+ * The {@link RowBatchReader} is a surprisingly minimal interface that
+ * nonetheless captures the essence of reading a result set as a set of 
batches.
+ * The factory implementations mentioned above implement this interface to 
provide
+ * commonly-used services, the most important of which is access to a
+ * {#link ResultSetLoader} to write values into value vectors.
+ *
+ * <h4>Schema Versions</h4>
+ * Readers may change schemas from time to time. To track such changes,
+ * this implementation tracks a batch schema version, maintained by comparing
+ * one schema with the next.
+ * <p>
+ * Readers can discover columns as they read data, such as with any JSON-based
+ * format. In this case, the row set mutator also provides a schema version,
+ * but a fine-grained one that changes each time a column is added.
+ * <p>
+ * The two schema versions serve different purposes and are not 
interchangeable.
+ * For example, if a scan reads two files, both will build up their own 
schemas,
+ * each increasing its internal version number as work proceeds. But, at the
+ * end of each batch, the schemas may (and, in fact, should) be identical,
+ * which is the schema version downstream operators care about.
+ */
+
+public class ScanOperatorExec implements OperatorExec {
+
+  private enum State { START, READER, END, FAILED, CLOSED }
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ScanOperatorExec.class);
+
+  private final ScanOperatorEvents factory;
+  protected final VectorContainerAccessor containerAccessor = new 
VectorContainerAccessor();
+  private State state = State.START;
+  protected OperatorContext context;
+  private int readerCount;
+  private ReaderState readerState;
+
+  public ScanOperatorExec(ScanOperatorEvents factory) {
+    this.factory = factory;
+  }
+
+  @Override
+  public void bind(OperatorContext context) {
+    this.context = context;
+    factory.bind(context);
+  }
+
+  @Override
+  public BatchAccessor batchAccessor() { return containerAccessor; }
+
+  @VisibleForTesting
+  public OperatorContext context() { return context; }
+
+  @Override
+  public boolean buildSchema() {
+    assert state == State.START;
+
+    // Spin though readers looking for the first that has enough data
+    // to provide a schema. Skips empty, missing or otherwise "null"
+    // readers.
+
+    nextAction(true);
+    if (state != State.END) {
+      return true;
+    }
+
+    // Reader count check done here because readers are passed as
+    // an iterator, not list. We don't know the count until we've
+    // seen EOF from the iterator.
+
+    if (readerCount == 0) {
+      // return false; // When empty batches are supported
+      throw UserException.executionError( // TODO: Test this path
+          new ExecutionSetupException("A scan batch must contain at least one 
reader."))
+        .build(logger);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean next() {
+    try {
+      switch (state) {
+
+      case READER:
+        // Read another batch from the list of row readers. Keeps opening,
+        // reading from, and closing readers as needed to locate a batch, or
+        // until all readers are exhausted. Terminates when a batch is read,
+        // or all readers are exhausted.
+
+        nextAction(false);
+        return state != State.END;
+
+      case END:
+        return false;
+
+      default:
+        throw new IllegalStateException("Unexpected state: " + state);
+      }
+    } catch(final Throwable t) {
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to