Ben-Zvi closed pull request #1481: DRILL-6763: Codegen optimization of SQL 
functions with constant values
URL: https://github.com/apache/drill/pull/1481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
index 134f90ee1fc..5b33acfb602 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ClassGenerator.java
@@ -22,10 +22,13 @@
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.types.TypeProtos;
@@ -39,7 +42,9 @@
 import org.apache.drill.exec.compile.sig.SignatureHolder;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.fn.WorkspaceReference;
+import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -65,6 +70,7 @@
 
   public static final GeneratorMapping DEFAULT_SCALAR_MAP = GM("doSetup", 
"doEval", null, null);
   public static final GeneratorMapping DEFAULT_CONSTANT_MAP = GM("doSetup", 
"doSetup", null, null);
+  public static final String INNER_CLASS_FIELD_NAME = "innerClassField";
 
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ClassGenerator.class);
 
@@ -76,6 +82,7 @@
   private final Map<String, ClassGenerator<T>> innerClasses = 
Maps.newHashMap();
   private final List<TypedFieldId> workspaceTypes = Lists.newArrayList();
   private final Map<WorkspaceReference, JVar> workspaceVectors = 
Maps.newHashMap();
+  private final Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends 
ValueHolder>> constantVars;
   private final CodeGenerator<T> codeGenerator;
 
   public final JDefinedClass clazz;
@@ -87,6 +94,8 @@
   private LinkedList<SizedJBlock>[] blocks;
   private LinkedList<SizedJBlock>[] oldBlocks;
 
+  private JVar innerClassField;
+
   /**
    * Assumed that field has 3 indexes within the constant pull: index of the 
CONSTANT_Fieldref_info +
    * CONSTANT_Fieldref_info.name_and_type_index + 
CONSTANT_NameAndType_info.name_index.
@@ -135,6 +144,7 @@ public static MappingSet getDefaultMapping() {
     this.evaluationVisitor = eval;
     this.model = model;
     this.optionManager = optionManager;
+    constantVars = new HashMap<>();
 
     blocks = (LinkedList<SizedJBlock>[]) new LinkedList[sig.size()];
     for (int i =0; i < sig.size(); i++) {
@@ -370,6 +380,7 @@ private boolean createNestedClass() {
         innerClassGenerator.maxIndex += index;
         // blocks from the inner class should be used
         setupInnerClassBlocks();
+        innerClassField = clazz.field(JMod.NONE, 
model.ref(innerClassGenerator.clazz.name()), INNER_CLASS_FIELD_NAME);
         return true;
       }
       return innerClassGenerator.createNestedClass();
@@ -425,10 +436,8 @@ private void rotateBlock(BlkCreateMode mode) {
    * Creates methods from the signature {@code sig} with body from the 
appropriate {@code blocks}.
    */
   void flushCode() {
-    JVar innerClassField = null;
     if (innerClassGenerator != null) {
       blocks = oldBlocks;
-      innerClassField = clazz.field(JMod.NONE, 
model.ref(innerClassGenerator.clazz.name()), "innerClassField");
       innerClassGenerator.flushCode();
     }
     int i = 0;
@@ -531,11 +540,48 @@ public JVar declareClassField(String prefix, JType t) {
 
   public JVar declareClassField(String prefix, JType t, JExpression init) {
     if (innerClassGenerator != null && hasMaxIndexValue()) {
-      return innerClassGenerator.clazz.field(JMod.NONE, t, prefix + index++, 
init);
+      return innerClassGenerator.declareClassField(prefix, t, init);
     }
     return clazz.field(JMod.NONE, t, prefix + index++, init);
   }
 
+  public Pair<Integer, JVar> declareClassConstField(String prefix, JType t,
+                                                    Function<DrillBuf, ? 
extends ValueHolder> function) {
+    return declareClassConstField(prefix, t, null, function);
+  }
+
+  /**
+   * declare a constant field for the class.
+   * argument {@code function} holds the constant value which
+   * returns a value holder must be set to the class field when the class 
instance created.
+   * the class field innerClassField will be created if innerClassGenerator 
exists.
+   *
+   * @param prefix the prefix name of class field
+   * @param t the type of class field
+   * @param init init expression
+   * @param function the function holds the constant value
+   * @return the depth of nested class, class field
+   */
+  public Pair<Integer, JVar> declareClassConstField(String prefix, JType t, 
JExpression init,
+                                                    Function<DrillBuf, ? 
extends ValueHolder> function) {
+    JVar var;
+    int depth = 1;
+    if (innerClassGenerator != null) {
+      Pair<Integer, JVar> nested = 
innerClassGenerator.declareClassConstField(prefix, t, init, function);
+      depth = nested.getKey() + 1;
+      var = nested.getValue();
+    } else {
+      var = clazz.field(JMod.NONE, t, prefix + index++, init);
+    }
+    Pair<Integer, JVar> depthVar = Pair.of(depth, var);
+    constantVars.put(depthVar, function);
+    return depthVar;
+  }
+
+  public Map<Pair<Integer, JVar>, Function<DrillBuf, ? extends ValueHolder>> 
getConstantVars() {
+    return constantVars;
+  }
+
   public HoldingContainer declare(MajorType t) {
     return declare(t, true);
   }
@@ -646,7 +692,7 @@ public void preparePlainJava() {
         Class<?> p = params[i];
         childNew.arg(shim.param(model._ref(p), "arg" + i));
       }
-      shim.body()._return(childNew);
+      shim.body()._return(JExpr._this().invoke("injectMembers").arg(childNew));
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 8685130e23e..e13a9adb504 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -24,6 +24,8 @@
 import java.util.Set;
 import java.util.Stack;
 
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
 import org.apache.drill.common.expression.AnyValueExpression;
 import org.apache.drill.common.expression.BooleanOperator;
 import org.apache.drill.common.expression.CastExpression;
@@ -56,19 +58,20 @@
 import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
 import 
org.apache.drill.common.expression.ValueExpressions.VarDecimalExpression;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.compile.sig.ConstantExpressionIdentifier;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.fn.AbstractFuncHolder;
+import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
 import org.apache.drill.exec.vector.ValueHolderHelper;
 import org.apache.drill.exec.vector.complex.reader.FieldReader;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
 import com.sun.codemodel.JBlock;
@@ -76,6 +79,7 @@
 import com.sun.codemodel.JConditional;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JFieldRef;
 import com.sun.codemodel.JInvocation;
 import com.sun.codemodel.JLabel;
 import com.sun.codemodel.JType;
@@ -265,72 +269,94 @@ public HoldingContainer visitSchemaPath(SchemaPath path, 
ClassGenerator<?> gener
       throw new UnsupportedOperationException("All schema paths should have 
been replaced with ValueVectorExpressions.");
     }
 
+    private HoldingContainer getHoldingContainer(ClassGenerator<?> generator,
+                                                 MajorType majorType,
+                                                 Function<DrillBuf, ? extends 
ValueHolder> function) {
+      JType holderType = generator.getHolderType(majorType);
+      Pair<Integer, JVar> depthVar = generator.declareClassConstField("const", 
holderType, function);
+      JFieldRef outputSet = null;
+      JVar var = depthVar.getValue();
+      if (majorType.getMode() == TypeProtos.DataMode.OPTIONAL) {
+        outputSet = var.ref("isSet");
+      }
+      return new HoldingContainer(majorType, var, var.ref("value"), outputSet);
+    }
+
     @Override
     public HoldingContainer visitLongConstant(LongExpression e, 
ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getLong()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getBigIntHolder(e.getLong()));
     }
 
     @Override
     public HoldingContainer visitIntConstant(IntExpression e, 
ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getInt()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getIntHolder(e.getInt()));
     }
 
     @Override
     public HoldingContainer visitDateConstant(DateExpression e, 
ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getDate()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDateHolder(e.getDate()));
     }
 
     @Override
     public HoldingContainer visitTimeConstant(TimeExpression e, 
ClassGenerator<?> generator) throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getTime()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getTimeHolder(e.getTime()));
     }
 
     @Override
     public HoldingContainer visitIntervalYearConstant(IntervalYearExpression 
e, ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), 
JExpr.lit(e.getIntervalYear()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> 
ValueHolderHelper.getIntervalYearHolder(e.getIntervalYear()));
     }
 
     @Override
     public HoldingContainer visitTimeStampConstant(TimeStampExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), 
JExpr.lit(e.getTimeStamp()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getTimeStampHolder(e.getTimeStamp()));
     }
 
     @Override
     public HoldingContainer visitFloatConstant(FloatExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getFloat()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getFloat4Holder(e.getFloat()));
     }
 
     @Override
     public HoldingContainer visitDoubleConstant(DoubleExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), 
JExpr.lit(e.getDouble()));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getFloat8Holder(e.getDouble()));
     }
 
     @Override
     public HoldingContainer visitBooleanConstant(BooleanExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      HoldingContainer out = generator.declare(e.getMajorType());
-      generator.getEvalBlock().assign(out.getValue(), JExpr.lit(e.getBoolean() 
? 1 : 0));
-      return out;
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getBitHolder(e.getBoolean() ? 1 : 0));
     }
 
     @Override
@@ -589,110 +615,64 @@ private HoldingContainer 
visitReturnValueExpression(ReturnValueExpression e, Cla
     @Override
     public HoldingContainer visitQuotedStringConstant(QuotedString e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("string", holderType);
-      JExpression stringLiteral = JExpr.lit(e.value);
-      JExpression buffer = 
generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarCharHolder").arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getVarCharHolder(buffer, e.getString()));
     }
 
     @Override
     public HoldingContainer visitIntervalDayConstant(IntervalDayExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = Types.required(MinorType.INTERVALDAY);
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("intervalday", holderType);
-      JExpression dayLiteral = JExpr.lit(e.getIntervalDay());
-      JExpression millisLiteral = JExpr.lit(e.getIntervalMillis());
-      setup.assign(
-          var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getIntervalDayHolder").arg(dayLiteral)
-              .arg(millisLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getIntervalDayHolder(e.getIntervalDay(), 
e.getIntervalMillis()));
     }
 
     @Override
     public HoldingContainer visitDecimal9Constant(Decimal9Expression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec9", holderType);
-      JExpression valueLiteral = JExpr.lit(e.getIntFromDecimal());
-      JExpression scaleLiteral = JExpr.lit(e.getScale());
-      JExpression precisionLiteral = JExpr.lit(e.getPrecision());
-      setup.assign(
-          var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal9Holder").arg(valueLiteral)
-              .arg(scaleLiteral).arg(precisionLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal9Holder(e.getIntFromDecimal(), 
e.getScale(), e.getPrecision()));
     }
 
     @Override
     public HoldingContainer visitDecimal18Constant(Decimal18Expression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec18", holderType);
-      JExpression valueLiteral = JExpr.lit(e.getLongFromDecimal());
-      JExpression scaleLiteral = JExpr.lit(e.getScale());
-      JExpression precisionLiteral = JExpr.lit(e.getPrecision());
-      setup.assign(
-          var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal18Holder").arg(valueLiteral)
-              .arg(scaleLiteral).arg(precisionLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal18Holder(e.getLongFromDecimal(), 
e.getScale(), e.getPrecision()));
     }
 
     @Override
     public HoldingContainer visitDecimal28Constant(Decimal28Expression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec28", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = 
generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal28Holder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal28Holder(buffer, 
e.getBigDecimal()));
     }
 
     @Override
     public HoldingContainer visitDecimal38Constant(Decimal38Expression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("dec38", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = 
generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getDecimal38Holder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getDecimal38Holder(buffer, 
e.getBigDecimal()));
     }
 
     @Override
     public HoldingContainer visitVarDecimalConstant(VarDecimalExpression e, 
ClassGenerator<?> generator)
         throws RuntimeException {
-      MajorType majorType = e.getMajorType();
-      JBlock setup = generator.getBlock(BlockType.SETUP);
-      JType holderType = generator.getHolderType(majorType);
-      JVar var = generator.declareClassField("varDec", holderType);
-      JExpression stringLiteral = JExpr.lit(e.getBigDecimal().toString());
-      JExpression buffer = 
generator.getMappingSet().getIncoming().invoke("getContext").invoke("getManagedBuffer");
-      setup.assign(var,
-          
generator.getModel().ref(ValueHolderHelper.class).staticInvoke("getVarDecimalHolder")
-              .arg(buffer).arg(stringLiteral));
-      return new HoldingContainer(majorType, var, null, null);
+      return getHoldingContainer(
+        generator,
+        e.getMajorType(),
+        buffer -> ValueHolderHelper.getVarDecimalHolder(buffer, 
e.getBigDecimal()));
     }
 
     @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
index 72d861453e9..a0373d9a72e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/interpreter/InterpreterEvaluator.java
@@ -227,7 +227,7 @@ public ValueHolder visitDecimal28Constant(final 
ValueExpressions.Decimal28Expres
         @Nullable
         @Override
         public ValueHolder apply(DrillBuf buffer) {
-          return ValueHolderHelper.getDecimal28Holder(buffer, 
decExpr.getBigDecimal().toString());
+          return ValueHolderHelper.getDecimal28Holder(buffer, 
decExpr.getBigDecimal());
         }
       });
     }
@@ -238,7 +238,7 @@ public ValueHolder visitDecimal38Constant(final 
ValueExpressions.Decimal38Expres
         @Nullable
         @Override
         public ValueHolder apply(DrillBuf buffer) {
-          return ValueHolderHelper.getDecimal38Holder(buffer, 
decExpr.getBigDecimal().toString());
+          return ValueHolderHelper.getDecimal38Holder(buffer, 
decExpr.getBigDecimal());
         }
       });
     }
@@ -246,7 +246,7 @@ public ValueHolder apply(DrillBuf buffer) {
     @Override
     public ValueHolder visitVarDecimalConstant(final 
ValueExpressions.VarDecimalExpression decExpr, Integer value) throws 
RuntimeException {
       return getConstantValueHolder(decExpr.getBigDecimal().toString(), 
decExpr.getMajorType().getMinorType(),
-          buffer -> 
ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), 
decExpr.getBigDecimal().toString()));
+          buffer -> 
ValueHolderHelper.getVarDecimalHolder(Objects.requireNonNull(buffer), 
decExpr.getBigDecimal()));
     }
 
     @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
index f81d4c98396..8005f046a9c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BaseFragmentContext.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 
 import io.netty.buffer.DrillBuf;
 
@@ -53,7 +54,9 @@ public FunctionImplementationRegistry getFunctionRegistry() {
   @Override
   public <T> T getImplementationClass(final CodeGenerator<T> cg)
       throws ClassTransformationException, IOException {
-    return getCompiler().createInstance(cg);
+    T instance = getCompiler().createInstance(cg);
+    CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this);
+    return instance;
   }
 
   @Override
@@ -63,7 +66,9 @@ public FunctionImplementationRegistry getFunctionRegistry() {
 
   @Override
   public <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final 
int instanceCount) throws ClassTransformationException, IOException {
-    return getCompiler().createInstances(cg, instanceCount);
+    List<T> instances = getCompiler().createInstances(cg, instanceCount);
+    instances.forEach(instance -> 
CodeGenMemberInjector.injectMembers(cg.getRoot(), instance, this));
+    return instances;
   }
 
   protected abstract BufferManager getBufferManager();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 22dfdf09074..aaca8a586b0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -379,8 +379,8 @@ private void purge() throws SchemaChangeException {
   private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int 
limit)
     throws SchemaChangeException, ClassTransformationException, IOException {
     return createNewPriorityQueue(
-      mainMapping, leftMapping, rightMapping, context.getOptions(), 
context.getFunctionRegistry(), context.getCompiler(),
-      config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, 
oContext.getAllocator(), schema.getSelectionVectorMode());
+      mainMapping, leftMapping, rightMapping, config.getOrderings(), batch, 
unionTypeEnabled,
+      codegenDump, limit, oContext.getAllocator(), 
schema.getSelectionVectorMode(), context);
   }
 
   public static MappingSet createMainMappingSet() {
@@ -397,10 +397,11 @@ public static MappingSet createRightMappingSet() {
 
   public static PriorityQueue createNewPriorityQueue(
     MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping,
-    OptionSet optionSet, FunctionLookupContext functionLookupContext, 
CodeCompiler codeCompiler,
     List<Ordering> orderings, VectorAccessible batch, boolean 
unionTypeEnabled, boolean codegenDump,
-    int limit, BufferAllocator allocator, SelectionVectorMode mode)
+    int limit, BufferAllocator allocator, SelectionVectorMode mode, 
FragmentContext context)
           throws ClassTransformationException, IOException, 
SchemaChangeException {
+    OptionSet optionSet = context.getOptions();
+    FunctionLookupContext functionLookupContext = 
context.getFunctionRegistry();
     CodeGenerator<PriorityQueue> cg = 
CodeGenerator.get(PriorityQueue.TEMPLATE_DEFINITION, optionSet);
     cg.plainJavaCapable(true);
     // Uncomment out this line to debug the generated code.
@@ -438,7 +439,7 @@ public static PriorityQueue createNewPriorityQueue(
     g.rotateBlock();
     g.getEvalBlock()._return(JExpr.lit(0));
 
-    PriorityQueue q = codeCompiler.createInstance(cg);
+    PriorityQueue q = context.getImplementationClass(cg);
     q.init(limit, allocator, mode == BatchSchema.SelectionVectorMode.TWO_BYTE);
     return q;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 80d25edb13a..0e0e3b8ca31 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -432,6 +432,7 @@ private HashAggregator createAggregatorInternal() throws 
SchemaChangeException,
     agg.setup(popConfig, htConfig, context, oContext, incoming, this,
         aggrExprs,
         cgInner.getWorkspaceTypes(),
+        cgInner,
         groupByOutFieldIds,
         this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6709cf6ddef..3d5449bb492 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -38,6 +38,7 @@
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 
 import org.apache.drill.exec.memory.BaseAllocator;
@@ -50,6 +51,7 @@
 import org.apache.drill.exec.physical.config.HashAggregate;
 import 
org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
 import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
@@ -139,7 +141,8 @@
   private HashAggBatch outgoing;
   private VectorContainer outContainer;
 
-  private FragmentContext context;
+  protected FragmentContext context;
+  protected ClassGenerator<?> cg;
   private OperatorContext oContext;
   private BufferAllocator allocator;
 
@@ -362,7 +365,7 @@ public void outputRecordValues(@Named("htRowIdx") int 
htRowIdx, @Named("outRowId
   @Override
   public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, 
FragmentContext context, OperatorContext oContext,
                     RecordBatch incoming, HashAggBatch outgoing, 
LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
-                    TypedFieldId[] groupByOutFieldIds, VectorContainer 
outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+                    ClassGenerator<?> cg, TypedFieldId[] groupByOutFieldIds, 
VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, 
IOException {
 
     if (valueExprs == null || valueFieldIds == null) {
       throw new IllegalArgumentException("Invalid aggr value exprs or 
workspace variables.");
@@ -377,6 +380,7 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
     this.oContext = oContext;
     this.incoming = incoming;
     this.outgoing = outgoing;
+    this.cg = cg;
     this.outContainer = outContainer;
     this.useMemoryPrediction = 
context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
 
@@ -1102,7 +1106,12 @@ private void addBatchHolder(int part, int batchRowCount) 
{
 
   // These methods are overridden in the generated class when created as plain 
Java code.
   protected BatchHolder newBatchHolder(int batchRowCount) {
-    return new BatchHolder(batchRowCount);
+    return this.injectMembers(new BatchHolder(batchRowCount));
+  }
+
+  protected BatchHolder injectMembers(BatchHolder batchHolder) {
+    CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+    return batchHolder;
   }
 
   /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 4c54650cf32..5ee77ab27df 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,6 +24,7 @@
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.HashAggregate;
@@ -46,8 +47,10 @@
   // OK - batch returned, NONE - end of data, RESTART - call again, EMIT - 
like OK but EMIT
   enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART, AGG_EMIT }
 
-  void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, 
FragmentContext context, OperatorContext oContext, RecordBatch incoming, 
HashAggBatch outgoing,
-             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, 
TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) 
throws SchemaChangeException, IOException, ClassTransformationException;
+  void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, 
FragmentContext context,
+             OperatorContext oContext, RecordBatch incoming, HashAggBatch 
outgoing,
+             LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, 
ClassGenerator<?> cg,
+             TypedFieldId[] keyFieldIds, VectorContainer outContainer, int 
extraRowBytes) throws SchemaChangeException, IOException, 
ClassTransformationException;
 
   IterOutcome getOutcome();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index a14bf8c5fe6..dcdac954f24 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -228,7 +228,7 @@ public HashTable createAndSetupHashTable(TypedFieldId[] 
outKeyFieldIds) throws C
     setupGetHash(cg /* use top level code generator for getHash */, 
GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
 
     HashTable ht = context.getImplementationClass(top);
-    ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, 
outgoing, htContainerOrig);
+    ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, 
outgoing, htContainerOrig, context, cgInner);
 
     return ht;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
new file mode 100644
index 00000000000..195f002b30d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/CodeGenMemberInjector.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+import com.sun.codemodel.JVar;
+import io.netty.buffer.DrillBuf;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Function;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CodeGenMemberInjector {
+
+  /**
+   * Generated code for a class may have several class members, they
+   * are initialized by invoking this method when the instance created.
+   *
+   * @param cg       the class generator
+   * @param instance the class instance created by the compiler
+   * @param context  the fragment context
+   */
+  public static void injectMembers(ClassGenerator<?> cg, Object instance, 
FragmentContext context) {
+    Map<Integer, Object> cachedInstances = new HashMap<>();
+    for (Map.Entry<Pair<Integer, JVar>, Function<DrillBuf, ? extends 
ValueHolder>> setter : cg.getConstantVars().entrySet()) {
+      try {
+        JVar var = setter.getKey().getValue();
+        Integer depth = setter.getKey().getKey();
+        Object varInstance = getFieldInstance(instance, depth, 
cachedInstances);
+        Field field = varInstance.getClass().getDeclaredField(var.name());
+        field.setAccessible(true);
+        field.set(varInstance, 
setter.getValue().apply(context.getManagedBuffer()));
+      } catch (ReflectiveOperationException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private static Object getFieldInstance(Object instance, Integer depth, 
Map<Integer, Object> cache) throws ReflectiveOperationException {
+    if (depth <= 1) {
+      return instance;
+    }
+    Object methodInstance = cache.get(depth);
+    if (methodInstance != null) {
+      return methodInstance;
+    }
+    methodInstance = getFieldInstance(instance, depth);
+    cache.put(depth, methodInstance);
+    return methodInstance;
+  }
+
+  private static Object getFieldInstance(Object instance, Integer depth) 
throws ReflectiveOperationException {
+    if (depth <= 1) {
+      return instance;
+    }
+    Field field = 
instance.getClass().getDeclaredField(ClassGenerator.INNER_CLASS_FIELD_NAME);
+    field.setAccessible(true);
+    return getFieldInstance(field.get(instance), depth - 1);
+  }
+}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 3bf4b86ecf4..e4144a50170 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -19,7 +19,9 @@
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
@@ -47,17 +49,19 @@
   int BATCH_MASK = 0x0000FFFF;
 
   /**
-   * {@link HashTable#setup(HashTableConfig, BufferAllocator, VectorContainer, 
RecordBatch, RecordBatch, VectorContainer)} must be called before anything can 
be done to the
-   * {@link HashTable}.
+   * {@link HashTable#setup} must be called before anything can be done to the 
{@link HashTable}.
+   *
    * @param htConfig
    * @param allocator
    * @param incomingBuild
    * @param incomingProbe
    * @param outgoing
    * @param htContainerOrig
+   * @param context
+   * @param cg
    */
   void setup(HashTableConfig htConfig, BufferAllocator allocator, 
VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
-             VectorContainer htContainerOrig);
+             VectorContainer htContainerOrig, FragmentContext context, 
ClassGenerator<?> cg);
 
   /**
    * Updates the incoming (build and probe side) value vectors references in 
the {@link HashTableTemplate.BatchHolder}s.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index fd4220207d5..9bf4f53a242 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -23,6 +23,8 @@
 
 import javax.inject.Named;
 
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -109,6 +111,10 @@
 
   private MaterializedField dummyIntField;
 
+  protected FragmentContext context;
+
+  protected ClassGenerator<?> cg;
+
   private int numResizing = 0;
 
   private int resizingTime = 0;
@@ -445,7 +451,9 @@ public long getActualSize() {
   }
 
   @Override
-  public void setup(HashTableConfig htConfig, BufferAllocator allocator, 
VectorContainer incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, 
VectorContainer htContainerOrig) {
+  public void setup(HashTableConfig htConfig, BufferAllocator allocator, 
VectorContainer incomingBuild,
+                    RecordBatch incomingProbe, RecordBatch outgoing, 
VectorContainer htContainerOrig,
+                    FragmentContext context, ClassGenerator<?> cg) {
     float loadf = htConfig.getLoadFactor();
     int initialCap = htConfig.getInitialCapacity();
 
@@ -469,6 +477,8 @@ public void setup(HashTableConfig htConfig, BufferAllocator 
allocator, VectorCon
     this.incomingProbe = incomingProbe;
     this.outgoing = outgoing;
     this.htContainerOrig = htContainerOrig;
+    this.context = context;
+    this.cg = cg;
     this.allocationTracker = new HashTableAllocationTracker(htConfig);
 
     // round up the initial capacity to nearest highest power of 2
@@ -761,7 +771,12 @@ private boolean addBatchIfNeeded(int currentIdx, int 
batchRowCount) throws Schem
   }
 
   protected BatchHolder newBatchHolder(int index, int newBatchHolderSize) { // 
special method to allow debugging of gen code
-    return new BatchHolder(index, newBatchHolderSize);
+    return this.injectMembers(new BatchHolder(index, newBatchHolderSize));
+  }
+
+  protected BatchHolder injectMembers(BatchHolder batchHolder) {
+    CodeGenMemberInjector.injectMembers(cg, batchHolder, context);
+    return batchHolder;
   }
 
   // Resize the hash table if needed by creating a new one with double the 
number of buckets.
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 929811ce508..9f8974b294d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -1325,8 +1325,7 @@ public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, I
 
     //  No real code generation !!
 
-    final HashJoinProbe hj = context.getImplementationClass(cg);
-    return hj;
+    return context.getImplementationClass(cg);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3918d27f616..c185ac7ac73 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -229,40 +229,7 @@ public boolean innerNext() {
 
   @VisibleForTesting
   protected void createPartitioner() throws SchemaChangeException {
-    final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
-    final int longTail = outGoingBatchCount % actualPartitions;
-
-    final List<Partitioner> subPartitioners = 
createClassInstances(actualPartitions);
-    int startIndex = 0;
-    int endIndex = 0;
-
-    boolean success = false;
-    try {
-      for (int i = 0; i < actualPartitions; i++) {
-        startIndex = endIndex;
-        endIndex = (i < actualPartitions - 1) ? startIndex + divisor : 
outGoingBatchCount;
-        if (i < longTail) {
-          endIndex++;
-        }
-        final OperatorStats partitionStats = new OperatorStats(stats, true);
-        subPartitioners.get(i).setup(context, incoming, popConfig, 
partitionStats, oContext,
-            startIndex, endIndex);
-      }
-
-      partitioner = new PartitionerDecorator(subPartitioners, stats, context);
-      for (int index = 0; index < terminations.size(); index++) {
-        partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
-      }
-      terminations.clear();
-
-      success = true;
-    } finally {
-      if (!success) {
-        for (Partitioner p : subPartitioners) {
-          p.clear();
-        }
-      }
-    }
+    createClassInstances(actualPartitions);
   }
 
   private List<Partitioner> createClassInstances(int actualPartitions) throws 
SchemaChangeException {
@@ -297,6 +264,39 @@ protected void createPartitioner() throws 
SchemaChangeException {
     try {
       // compile and setup generated code
       List<Partitioner> subPartitioners = context.getImplementationClass(cg, 
actualPartitions);
+
+      final int divisor = Math.max(1, outGoingBatchCount/actualPartitions);
+      final int longTail = outGoingBatchCount % actualPartitions;
+      int startIndex = 0;
+      int endIndex = 0;
+
+      boolean success = false;
+      try {
+        for (int i = 0; i < actualPartitions; i++) {
+          startIndex = endIndex;
+          endIndex = (i < actualPartitions - 1) ? startIndex + divisor : 
outGoingBatchCount;
+          if (i < longTail) {
+            endIndex++;
+          }
+          final OperatorStats partitionStats = new OperatorStats(stats, true);
+          subPartitioners.get(i).setup(context, incoming, popConfig, 
partitionStats, oContext,
+            cgInner, startIndex, endIndex);
+        }
+
+        partitioner = new PartitionerDecorator(subPartitioners, stats, 
context);
+        for (int index = 0; index < terminations.size(); index++) {
+          
partitioner.getOutgoingBatches(terminations.buffer[index]).terminate();
+        }
+        terminations.clear();
+
+        success = true;
+      } finally {
+        if (!success) {
+          for (Partitioner p : subPartitioners) {
+            p.clear();
+          }
+        }
+      }
       return subPartitioners;
 
     } catch (ClassTransformationException | IOException e) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 316c2625cf5..723c0b568a9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -22,6 +22,7 @@
 
 import org.apache.drill.exec.compile.TemplateClassDefinition;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.ops.ExchangeFragmentContext;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.ops.OperatorStats;
@@ -34,6 +35,7 @@ void setup(ExchangeFragmentContext context,
              HashPartitionSender popConfig,
              OperatorStats stats,
              OperatorContext oContext,
+             ClassGenerator<?> cg,
              int start, int count) throws SchemaChangeException;
 
   void partitionBatch(RecordBatch incoming) throws IOException;
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index ea31a79cb02..687ff814ad3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -26,6 +26,7 @@
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
@@ -35,6 +36,7 @@
 import org.apache.drill.exec.ops.OperatorStats;
 import org.apache.drill.exec.physical.MinorFragmentEndpoint;
 import org.apache.drill.exec.physical.config.HashPartitionSender;
+import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.record.BatchSchema;
@@ -62,6 +64,8 @@
   private SelectionVector4 sv4;
   private RecordBatch incoming;
   private OperatorStats stats;
+  protected ClassGenerator<?> cg;
+  protected FragmentContext context;
   private int start;
   private int end;
   private List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
@@ -87,10 +91,13 @@ public final void setup(ExchangeFragmentContext context,
                           HashPartitionSender popConfig,
                           OperatorStats stats,
                           OperatorContext oContext,
+                          ClassGenerator<?> cg,
                           int start, int end) throws SchemaChangeException {
 
     this.incoming = incoming;
     this.stats = stats;
+    this.context = context;
+    this.cg = cg;
     this.start = start;
     this.end = end;
     doSetup(context, incoming, null);
@@ -144,7 +151,12 @@ public final void setup(ExchangeFragmentContext context,
   protected OutgoingRecordBatch newOutgoingRecordBatch(
                                OperatorStats stats, HashPartitionSender 
operator, AccountingDataTunnel tunnel,
                                FragmentContext context, BufferAllocator 
allocator, int oppositeMinorFragmentId) {
-    return new OutgoingRecordBatch(stats, operator, tunnel, context, 
allocator, oppositeMinorFragmentId);
+    return this.injectMembers(new OutgoingRecordBatch(stats, operator, tunnel, 
context, allocator, oppositeMinorFragmentId));
+  }
+
+  protected OutgoingRecordBatch injectMembers(OutgoingRecordBatch 
outgoingRecordBatch) {
+    CodeGenMemberInjector.injectMembers(cg, outgoingRecordBatch, context);
+    return outgoingRecordBatch;
   }
 
   @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
index 3153cd0a8a4..f48315d0ee8 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/ExampleTemplateWithInner.java
@@ -72,7 +72,11 @@ public void doDouble() throws SchemaChangeException {
     }
 
     protected DoubleInner newDoubleInner() {
-      return new DoubleInner();
+      return this.injectMembers(new DoubleInner());
+    }
+
+    protected DoubleInner injectMembers(DoubleInner doubleInner) {
+      return doubleInner;
     }
 
     public class DoubleInner {
@@ -101,6 +105,10 @@ public void doInsideOutside() throws SchemaChangeException 
{
    * subclass (or replacement) of the template inner class
    */
   protected TheInnerClass newTheInnerClass( ) {
-    return new TheInnerClass();
+    return this.injectMembers(new TheInnerClass());
+  }
+
+  protected TheInnerClass injectMembers(TheInnerClass theInnerClass) {
+    return theInnerClass;
   }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
index 084107ddec5..ecff4e10dcd 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestLargeFileCompilation.java
@@ -17,6 +17,8 @@
  */
 package org.apache.drill.exec.compile;
 
+import java.util.concurrent.ThreadLocalRandom;
+
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.test.BaseTestQuery;
 import org.apache.drill.test.TestTools;
@@ -39,6 +41,8 @@
 
   private static final String LARGE_QUERY_FILTER;
 
+  private static final String HUGE_STRING_CONST_QUERY;
+
   private static final String LARGE_QUERY_WRITER;
 
   private static final String LARGE_QUERY_SELECT_LIST;
@@ -109,6 +113,21 @@
     LARGE_QUERY_FILTER = sb.append(" true") .toString();
   }
 
+  static {
+    final char[] alphabet = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+    int len = 1 << 18;
+    char[] longText = new char[len];
+    for (int j = 0; j < len; ++j) {
+      longText[j] = alphabet[ThreadLocalRandom.current().nextInt(0, 
alphabet.length)];
+    }
+    StringBuilder sb = new StringBuilder("select *\n")
+      .append("from cp.`employee.json`\n")
+      .append("where last_name ='")
+      .append(longText)
+      .append("'");
+    HUGE_STRING_CONST_QUERY = sb.toString();
+  }
+
   static {
     LARGE_QUERY_WRITER = createTableWithColsCount(NUM_PROJECT_COLUMNS);
     LARGE_TABLE_WRITER = createTableWithColsCount(NUM_JOIN_TABLE_COLUMNS);
@@ -228,4 +247,24 @@ public void testNestedLoopJoin() throws Exception {
       testNoResult("drop table if exists %s", tableName);
     }
   }
+
+  @Test
+  public void testJDKHugeStringConstantCompilation() throws Exception {
+    try {
+      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JDK");
+      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+    } finally {
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+    }
+  }
+
+  @Test
+  public void testJaninoHugeStringConstantCompilation() throws Exception {
+    try {
+      setSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION, "JANINO");
+      testNoResult(ITERATION_COUNT, HUGE_STRING_CONST_QUERY);
+    } finally {
+      resetSessionOption(ClassCompilerSelector.JAVA_COMPILER_OPTION);
+    }
+  }
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
index 7537cfbeaa0..e3731a8e858 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TopN/TopNBatchTest.java
@@ -21,6 +21,9 @@
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.drill.exec.ops.FragmentContextImpl;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.test.ClusterFixtureBuilder;
@@ -31,7 +34,6 @@
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -50,6 +52,8 @@
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.mockito.Mockito.when;
+
 @Category(OperatorTest.class)
 public class TopNBatchTest extends PopUnitTestBase {
   @Rule
@@ -63,6 +67,8 @@
   public void priorityQueueOrderingTest() throws Exception {
     Properties properties = new Properties();
     DrillConfig drillConfig = DrillConfig.create(properties);
+    DrillbitContext drillbitContext = mockDrillbitContext();
+    when(drillbitContext.getFunctionImplementationRegistry()).thenReturn(new 
FunctionImplementationRegistry(drillConfig));
 
     FieldReference expr = FieldReference.getWithQuotedRef("colA");
     Order.Ordering ordering = new Order.Ordering(Order.Ordering.ORDER_DESC, 
expr, Order.Ordering.NULLS_FIRST);
@@ -73,6 +79,9 @@ public void priorityQueueOrderingTest() throws Exception {
 
     List<MaterializedField> cols = Lists.newArrayList(colA, colB);
     BatchSchema batchSchema = new 
BatchSchema(BatchSchema.SelectionVectorMode.NONE, cols);
+    FragmentContextImpl context = new FragmentContextImpl(drillbitContext,
+      BitControl.PlanFragment.getDefaultInstance(), null,
+      drillbitContext.getFunctionImplementationRegistry());
     RowSet expectedRowSet;
 
     try (RootAllocator allocator = new RootAllocator(100_000_000)) {
@@ -100,12 +109,10 @@ public void priorityQueueOrderingTest() throws Exception {
 
         queue = TopNBatch.createNewPriorityQueue(
           TopNBatch.createMainMappingSet(), TopNBatch.createLeftMappingSet(),
-          TopNBatch.createRightMappingSet(), optionManager,
-          new FunctionImplementationRegistry(drillConfig),
-          new CodeCompiler(drillConfig, optionManager),
+          TopNBatch.createRightMappingSet(),
           orderings, hyperContainer, false,
           true, 10, allocator,
-          batchSchema.getSelectionVectorMode());
+          batchSchema.getSelectionVectorMode(), context);
       }
 
       List<RecordBatchData> testBatches = Lists.newArrayList();
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
index 52afbe0cbec..7087687c12e 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueHolderHelper.java
@@ -169,10 +169,13 @@ public static Decimal18Holder getDecimal18Holder(long 
decimal, int scale, int pr
   }
 
   public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, String 
decimal) {
+    BigDecimal bigDecimal = new BigDecimal(decimal);
 
-    Decimal28SparseHolder dch = new Decimal28SparseHolder();
+    return getDecimal28Holder(buf, bigDecimal);
+  }
 
-    BigDecimal bigDecimal = new BigDecimal(decimal);
+  public static Decimal28SparseHolder getDecimal28Holder(DrillBuf buf, 
BigDecimal bigDecimal) {
+    Decimal28SparseHolder dch = new Decimal28SparseHolder();
 
     dch.scale = bigDecimal.scale();
     dch.precision = bigDecimal.precision();
@@ -180,33 +183,40 @@ public static Decimal28SparseHolder 
getDecimal28Holder(DrillBuf buf, String deci
     dch.start = 0;
     dch.buffer = buf.reallocIfNeeded(5 * DecimalUtility.INTEGER_SIZE);
     DecimalUtility
-        .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, 
Decimal28SparseHolder.nDecimalDigits);
+      .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, 
Decimal28SparseHolder.nDecimalDigits);
 
     return dch;
   }
 
   public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, String 
decimal) {
+      BigDecimal bigDecimal = new BigDecimal(decimal);
 
-      Decimal38SparseHolder dch = new Decimal38SparseHolder();
+      return getDecimal38Holder(buf, bigDecimal);
+  }
 
-      BigDecimal bigDecimal = new BigDecimal(decimal);
+  public static Decimal38SparseHolder getDecimal38Holder(DrillBuf buf, 
BigDecimal bigDecimal) {
+    Decimal38SparseHolder dch = new Decimal38SparseHolder();
 
-      dch.scale = bigDecimal.scale();
-      dch.precision = bigDecimal.precision();
-      Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, 
dch.buffer);
-      dch.start = 0;
+    dch.scale = bigDecimal.scale();
+    dch.precision = bigDecimal.precision();
+    Decimal38SparseHolder.setSign(bigDecimal.signum() == -1, dch.start, 
dch.buffer);
+    dch.start = 0;
     dch.buffer = buf.reallocIfNeeded(Decimal38SparseHolder.maxPrecision * 
DecimalUtility.INTEGER_SIZE);
     DecimalUtility
-        .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, 
Decimal38SparseHolder.nDecimalDigits);
+      .getSparseFromBigDecimal(bigDecimal, dch.buffer, dch.start, dch.scale, 
Decimal38SparseHolder.nDecimalDigits);
 
-      return dch;
+    return dch;
   }
 
   public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, String 
decimal) {
-    VarDecimalHolder dch = new VarDecimalHolder();
-
     BigDecimal bigDecimal = new BigDecimal(decimal);
 
+    return getVarDecimalHolder(buf, bigDecimal);
+  }
+
+  public static VarDecimalHolder getVarDecimalHolder(DrillBuf buf, BigDecimal 
bigDecimal) {
+    VarDecimalHolder dch = new VarDecimalHolder();
+
     byte[] bytes = bigDecimal.unscaledValue().toByteArray();
     int length = bytes.length;
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to