[ 
https://issues.apache.org/jira/browse/DRILL-7503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010774#comment-17010774
 ] 

ASF GitHub Bot commented on DRILL-7503:
---------------------------------------

arina-ielchiieva commented on pull request #1944: DRILL-7503: Refactor the 
project operator
URL: https://github.com/apache/drill/pull/1944#discussion_r364289943
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
 ##########
 @@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.project;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.FunctionCallFactory;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.PathSegment.NameSegment;
+import org.apache.drill.common.expression.fn.FunctionReplacementUtils;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.DrillFuncHolderExpr;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionLookupContext;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.ColumnExplorer;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.hppc.IntHashSet;
+
+/**
+ * Plans the projection given the incoming and requested outgoing schemas. 
Works
+ * with the {@link VectorState} to create required vectors, writers and so on.
+ * Populates the code generator with the "projector" expressions.
+ */
+class ProjectionMaterializer {
+  private static final Logger logger = 
LoggerFactory.getLogger(ProjectionMaterializer.class);
+  private static final String EMPTY_STRING = "";
+
+  /**
+   * Abstracts the physical vector setup operations to separate
+   * the physical setup, in <code>ProjectRecordBatch</code>, from the
+   * logical setup in the materializer class.
+   */
+  public interface BatchBuilder {
+    void addTransferField(String name, ValueVector vvIn);
+    ValueVectorWriteExpression addOutputVector(String name, LogicalExpression 
expr);
+    int addDirectTransfer(FieldReference ref, ValueVectorReadExpression 
vectorRead);
+    void addComplexField(FieldReference ref);
+    ValueVectorWriteExpression addEvalVector(String outputName,
+        LogicalExpression expr);
+  }
+
+  private static class ClassifierResult {
+    private boolean isStar;
+    private List<String> outputNames;
+    private String prefix = "";
+    private final HashMap<String, Integer> prefixMap = Maps.newHashMap();
+    private final CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
+    private final CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+
+    private void clear() {
+      isStar = false;
+      prefix = "";
+      if (outputNames != null) {
+        outputNames.clear();
+      }
+
+      // note: don't clear the internal maps since they have cumulative data..
+    }
+  }
+
+  private final ClassGenerator<Projector> cg;
+  private final VectorAccessible incomingBatch;
+  private final BatchSchema incomingSchema;
+  private final List<NamedExpression> exprSpec;
+  private final FunctionLookupContext functionLookupContext;
+  private final BatchBuilder batchBuilder;
+  private final boolean unionTypeEnabled;
+  private final ErrorCollector collector = new ErrorCollectorImpl();
+  private final ColumnExplorer columnExplorer;
+  private final IntHashSet transferFieldIds = new IntHashSet();
+  private final ProjectionMaterializer.ClassifierResult result = new 
ClassifierResult();
+  private boolean isAnyWildcard;
+  private boolean classify;
+
+  public ProjectionMaterializer(OptionManager options,
+      VectorAccessible incomingBatch, List<NamedExpression> exprSpec,
+      FunctionLookupContext functionLookupContext, BatchBuilder batchBuilder,
+      boolean unionTypeEnabled) {
+    this.incomingBatch = incomingBatch;
+    this.incomingSchema = incomingBatch.getSchema();
+    this.exprSpec = exprSpec;
+    this.functionLookupContext = functionLookupContext;
+    this.batchBuilder = batchBuilder;
+    this.unionTypeEnabled = unionTypeEnabled;
+    columnExplorer = new ColumnExplorer(options);
+    cg = CodeGenerator.getRoot(Projector.TEMPLATE_DEFINITION, options);
+  }
+
+  public Projector generateProjector(FragmentContext context, boolean saveCode)
+      throws ClassTransformationException, IOException, SchemaChangeException {
+    long setupNewSchemaStartTime = System.currentTimeMillis();
+    setup();
+    CodeGenerator<Projector> codeGen = cg.getCodeGenerator();
+    codeGen.plainJavaCapable(true);
+    codeGen.saveCodeForDebugging(saveCode);
+    Projector projector = context.getImplementationClass(codeGen);
+
+    long setupNewSchemaEndTime = System.currentTimeMillis();
+    logger.trace("generateProjector: time {}  ms, Project {}, incoming {}",
+             (setupNewSchemaEndTime - setupNewSchemaStartTime), exprSpec, 
incomingSchema);
+    return projector;
+  }
+
+  private void setup() throws SchemaChangeException {
+    List<NamedExpression> exprs = exprSpec != null ? exprSpec
+        : inferExpressions();
+    isAnyWildcard = isAnyWildcard(exprs);
+    classify = isClassificationNeeded(exprs);
+
+    for (NamedExpression namedExpression : exprs) {
+      setupExpression(namedExpression);
+    }
+  }
+
+  private List<NamedExpression> inferExpressions() {
+    List<NamedExpression> exprs = Lists.newArrayList();
+    for (MaterializedField field : incomingSchema) {
+      String fieldName = field.getName();
+      if (Types.isComplex(field.getType())
+          || Types.isRepeated(field.getType())) {
+        LogicalExpression convertToJson = FunctionCallFactory.createConvert(
+            ConvertExpression.CONVERT_TO, "JSON",
+            SchemaPath.getSimplePath(fieldName), ExpressionPosition.UNKNOWN);
+        String castFuncName = FunctionReplacementUtils
+            .getCastFunc(MinorType.VARCHAR);
+        List<LogicalExpression> castArgs = Lists.newArrayList();
+        castArgs.add(convertToJson); // input_expr
+        // Implicitly casting to varchar, since we don't know actual source
+        // length, cast to undefined length, which will preserve source length
+        castArgs.add(new ValueExpressions.LongExpression(
+            Types.MAX_VARCHAR_LENGTH, null));
+        FunctionCall castCall = new FunctionCall(castFuncName, castArgs,
+            ExpressionPosition.UNKNOWN);
+        exprs.add(new NamedExpression(castCall, new 
FieldReference(fieldName)));
+      } else {
+        exprs.add(new NamedExpression(SchemaPath.getSimplePath(fieldName),
+            new FieldReference(fieldName)));
+      }
+    }
+    return exprs;
+  }
+
+  private boolean isAnyWildcard(List<NamedExpression> exprs) {
+    for (NamedExpression e : exprs) {
+      if (isWildcard(e)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private boolean isWildcard(NamedExpression ex) {
+    if (!(ex.getExpr() instanceof SchemaPath)) {
+      return false;
+    }
+    NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+    return expr.getPath().contains(SchemaPath.DYNAMIC_STAR);
+  }
+
+  private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+    boolean needed = false;
+    for (NamedExpression ex : exprs) {
+      if (!(ex.getExpr() instanceof SchemaPath)) {
+        continue;
+      }
+      NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+      NameSegment ref = ex.getRef().getRootSegment();
+      boolean refHasPrefix = ref.getPath()
+          .contains(StarColumnHelper.PREFIX_DELIMITER);
+      boolean exprContainsStar = expr.getPath()
+          .contains(SchemaPath.DYNAMIC_STAR);
+
+      if (refHasPrefix || exprContainsStar) {
+        needed = true;
+        break;
+      }
+    }
+    return needed;
+  }
+
+  private void setupExpression(NamedExpression namedExpression)
+      throws SchemaChangeException {
+    result.clear();
+    if (classify && namedExpression.getExpr() instanceof SchemaPath) {
+      classifyExpr(namedExpression, result);
+
+      if (result.isStar) {
+        setupImplicitColumnRef(namedExpression);
+        return;
+      }
+    } else {
+      // For the columns which do not needed to be classified,
+      // it is still necessary to ensure the output column name is unique
+      result.outputNames = Lists.newArrayList();
+      String outputName = getRef(namedExpression).getRootSegment().getPath(); 
// moved
+                                                                              
// to
+                                                                              
// before
+                                                                              
// the
+                                                                              
// if
 
 Review comment:
   maybe have all this on one line?
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor project operator
> -------------------------
>
>                 Key: DRILL-7503
>                 URL: https://issues.apache.org/jira/browse/DRILL-7503
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.17.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>             Fix For: 1.18.0
>
>
> Work on another ticket revealed that the Project operator ("record batch") 
> has grown quite complex. The setup phase lives in the operator as one huge 
> function. The function combines the "logical" tasks of working out the 
> projection expressions and types, the code gen for those expressions, and the 
> physical setup of vectors.
> The refactoring breaks up the logic so that it is easier to focus on the 
> specific bits of interest.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to