arina-ielchiieva commented on a change in pull request #1501: DRILL-6791: Scan 
projection framework
URL: https://github.com/apache/drill/pull/1501#discussion_r233452637
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/project/ResolvedTuple.java
 ##########
 @@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.scan.project;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.rowSet.ResultVectorCache;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.vector.UInt4Vector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.RepeatedMapVector;
+
+import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Drill rows are made up of a tree of tuples, with the row being the root
+ * tuple. Each tuple contains columns, some of which may be maps. This
+ * class represents each row or map in the output projection.
+ * <p>
+ * Output columns within the tuple can be projected from the data source,
+ * might be null (requested columns that don't match a data source column)
+ * or might be a constant (such as an implicit column.) This class
+ * orchestrates assembling an output tuple from a collection of these
+ * three column types. (Though implicit columns appear only in the root
+ * tuple.)
+ *
+ * <h4>Null Handling</h4>
+ *
+ * The project list might reference a "missing" map if the project list
+ * includes, say, <tt>SELECT a.b.c</tt> but <tt>`a`</tt> does not exist
+ * in the data source. In this case, the column a is implied to be a map,
+ * so the projection mechanism will create a null map for <tt>`a`</tt>
+ * and <tt>`b`</tt>, and will create a null column for <tt>`c`</tt>.
+ * <p>
+ * To accomplish this recursive null processing, each tuple is associated
+ * with a null builder. (The null builder can be null if projection is
+ * implicit with a wildcard; in such a case no null columns can occur.
+ * But, even here, with schema persistence, a <tt>SELECT *</tt> query
+ * may need null columns if a second file does not contain a column
+ * that appeared in a first file.)
+ * <p>
+ * The null builder is bound to each tuple to allow vector persistence
+ * via the result vector cache. If we must create a null column
+ * <tt>`x`</tt> in two different readers, then the rules of Drill
+ * require that the same vector be used for both (or else a schema
+ * change is signaled.) The vector cache works by name (and type).
+ * Since maps may contain columns with the same names as other maps,
+ * the vector cache must be associated with each tuple. And, by extension,
+ * the null builder must also be associated with each tuple.
+ *
+ * <h4>Lifecycle</h4>
+ *
+ * The lifecycle of a resolved tuple is:
+ * <ul>
+ * <li>The projection mechanism creates the output tuple, and its columns,
+ * by comparing the project list against the table schema. The result is
+ * a set of table, null, or constant columns.</li>
+ * <li>Once per schema change, the resolved tuple creates the output
+ * tuple by linking to vectors in their original locations. As it turns out,
+ * we can simply share the vectors; we don't need to transfer the buffers.</li>
+ * <li>To prepare for the transfer, the tuple asks the null column builder
+ * (if present) to build the required null columns.</li>
+ * <li>Once the output tuple is built, it can be used for any number of
+ * batches without further work. (The same vectors appear in the various inputs
+ * and the output, eliminating the need for any transfers.)</li>
+ * <li>Once per batch, the client must set the row count. This is needed for 
the
+ * output container, and for any "null" maps that the project may have 
created.</li>
+ * </ul>
+ *
+ * <h4>Projection Mapping</h4>
+ *
+ * Each column is is mapped into the output tuple (vector container or map) in
+ * the order that the columns are defined here. (That order follows the project
+ * list for explicit projection, or the table schema for implicit projection.)
+ * The source, however, may be in any order (at least for the table schema.)
+ * A projection mechanism identifies the {@link VectorSource} that supplies the
+ * vector for the column, along with the vector's index within that source.
+ * The resolved tuple is bound to an output tuple. The projection mechanism
+ * grabs the input vector from the vector source at the indicated position, and
+ * links it into the output tuple, represented by this projected tuple, at the
+ * position of the resolved column in the child list.
+ *
+ * <h4>Caveats</h4>
+ *
+ * The project mechanism handles nested "missing" columns as mentioned
+ * above. This works to create null columns within maps that are defined by the
+ * data source. However, the mechanism does not currently handle creating null
+ * columns within repeated maps or lists. Doing so is possible, but requires
+ * adding a level of cardinality computation to create the proper number of
+ * "inner" values.
+ */
+
+public abstract class ResolvedTuple implements VectorSource {
+
+  /**
+   * Represents the top-level tuple which is projected to a
+   * vector container.
+   */
+
+  public static class ResolvedRow extends ResolvedTuple {
+
+    private VectorContainer input;
+    private VectorContainer output;
+
+    public ResolvedRow(NullColumnBuilder nullBuilder) {
+      super(nullBuilder);
+    }
+
+    public void project(VectorContainer input, VectorContainer output) {
+      this.input = input;
+      this.output = output;
+      output.removeAll();
+      buildColumns();
+      output.buildSchema(SelectionVectorMode.NONE);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      return input.getValueVector(index).getValueVector();
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      output.add(vector);
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      output.setRecordCount(rowCount);
+      cascadeRowCount(rowCount);
+    }
+
+    @Override
+    public BufferAllocator allocator() {
+      return output.getAllocator();
+    }
+
+    @Override
+    public String name() {
+
+      // Never used in production, just for debugging.
+
+       return "$root$";
+    }
+
+    public VectorContainer output() { return output; }
+
+    @Override
+    public int innerCardinality(int rowCount) { return rowCount; }
+  }
+
+  /**
+   * Represents a map implied by the project list, whether or not the map
+   * actually appears in the table schema.
+   * The column is implied to be a map because it contains
+   * children. This implementation builds the map and its children.
+   */
+
+  public static abstract class ResolvedMap extends ResolvedTuple {
+
+    protected final ResolvedMapColumn parentColumn;
+    protected AbstractMapVector inputMap;
+    protected AbstractMapVector outputMap;
+
+    public ResolvedMap(ResolvedMapColumn parentColumn) {
+      super(parentColumn.parent().nullBuilder == null ? null : 
parentColumn.parent().nullBuilder.newChild());
+      this.parentColumn = parentColumn;
+    }
+
+    @Override
+    public void addVector(ValueVector vector) {
+      outputMap.putChild(vector.getField().getName(), vector);
+    }
+
+    @Override
+    public ValueVector vector(int index) {
+      assert inputMap != null;
+      return inputMap.getChildByOrdinal(index);
+    }
+
+    public AbstractMapVector buildMap() {
+      if (parentColumn.sourceIndex() != -1) {
+        final ResolvedTuple parentTuple = parentColumn.parent();
+        inputMap = (AbstractMapVector) 
parentTuple.vector(parentColumn.sourceIndex());
+      }
+      final MaterializedField colSchema = parentColumn.schema();
+      outputMap = createMap(inputMap,
+          MaterializedField.create(
+              colSchema.getName(), colSchema.getType()),
+          parentColumn.parent().allocator());
+      buildColumns();
+      return outputMap;
+    }
+
+    protected abstract AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField create, BufferAllocator allocator);
+
+    @Override
+    public BufferAllocator allocator() {
+      return outputMap.getAllocator();
+    }
+
+    @Override
+    public String name() { return parentColumn.name(); }
+  }
+
+  public static class ResolvedSingleMap extends ResolvedMap {
+
+    public ResolvedSingleMap(ResolvedMapColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    protected AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField schema, BufferAllocator allocator) {
+      return new MapVector(schema,
+          allocator, null);
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      ((MapVector) outputMap).setMapValueCount(rowCount);
+      cascadeRowCount(rowCount);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return outerCardinality;
+    }
+  }
+
+  /**
+   * Represents a map tuple (not the map column, rather the value of the
+   * map column.) When projecting, we create a new repeated map vector,
+   * but share the offsets vector from input to output. The size of the
+   * offset vector reveals the number of elements in the "inner" array,
+   * which is the number of null values to create if null columns are
+   * added.
+   */
+
+  public static class ResolvedMapArray extends ResolvedMap {
+
+    private int valueCount;
+
+    public ResolvedMapArray(ResolvedMapColumn parentColumn) {
+      super(parentColumn);
+    }
+
+    @Override
+    protected AbstractMapVector createMap(AbstractMapVector inputMap,
+        MaterializedField schema, BufferAllocator allocator) {
+
+      // Create a new map array, reusing the offset vector from
+      // the original input map.
+
+      final RepeatedMapVector source = (RepeatedMapVector) inputMap;
+      final UInt4Vector offsets = source.getOffsetVector();
+      valueCount = offsets.getAccessor().getValueCount();
+      return new RepeatedMapVector(schema,
+          offsets, null);
+    }
+
+    @Override
+    public int innerCardinality(int outerCardinality) {
+      return valueCount;
+    }
+
+    @Override
+    public void setRowCount(int rowCount) {
+      cascadeRowCount(valueCount);
+    }
+  }
+
+  protected final List<ResolvedColumn> members = new ArrayList<>();
+  protected final NullColumnBuilder nullBuilder;
+  protected List<ResolvedTuple> children;
+  protected VectorSource binding;
+
+  public ResolvedTuple(NullColumnBuilder nullBuilder) {
+    this.nullBuilder = nullBuilder;
+  }
+
+  public NullColumnBuilder nullBuilder() {
+    return nullBuilder;
+  }
+
+  public void add(ResolvedColumn col) {
+    members.add(col);
+  }
+
+  public void addChild(ResolvedTuple child) {
+    if (children == null) {
+      children = new ArrayList<>();
+    }
+    children.add(child);
+  }
+
+  public void removeChild(ResolvedTuple child) {
+    assert ! children.isEmpty() && children.get(children.size()-1) == child;
+    children.remove(children.size()-1);
+  }
+
+  public boolean isSimpleProjection() {
+    if (children != null && ! children.isEmpty()) {
+      return false;
+    }
+    for (int i = 0; i < members.size(); i++) {
+      if (members.get(i).nodeType() == ResolvedNullColumn.ID) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  public List<ResolvedColumn> columns() { return members; }
+
+  public void buildNulls(ResultVectorCache vectorCache) {
+    if (nullBuilder != null) {
+      nullBuilder.build(vectorCache);
+    }
+    if (children != null) {
+      for (final ResolvedTuple child : children) {
+        child.buildNulls(vectorCache.childCache(child.name()));
+      }
+    }
+  }
+
+  public void loadNulls(int rowCount) {
+    if (nullBuilder != null) {
+      nullBuilder.load(rowCount);
+    }
+    if (children != null) {
+      for (final ResolvedTuple child : children) {
+        child.loadNulls(innerCardinality(rowCount));
+      }
+    }
+  }
+
+  public abstract int innerCardinality(int outerCardinality);
+
+  /**
+   * Merge two or more batches to produce an output batch. For example, 
consider
+   * two input batches:<pre><code>
+   * (a, d, e)
+   * (c, b)</code></pre>
+   * We may wish to merge them by projecting columns into an output batch
+   * of the form:<pre><code>
+   * (a, b, c, d)</code></pre>
+   * It is not necessary to project all columns from the inputs, but all
+   * columns in the output must have a projection.
+   * <p>
+   * The merger is created once per schema, then can be reused for any
+   * number of batches. The only restriction is that the two batches must
 
 Review comment:
   Why two batches should have the same number of rows?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to