Rewrite convert_from/ convert_to functions to actual function implementations
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3c2402c9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3c2402c9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3c2402c9 Branch: refs/heads/master Commit: 3c2402c951a761e489241e768f551ff2c2cf8ea6 Parents: 415506b Author: Mehant Baid <[email protected]> Authored: Thu Jul 17 16:59:12 2014 -0700 Committer: Aditya Kishore <[email protected]> Committed: Thu Jul 24 16:16:01 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/expr/fn/DrillFuncHolder.java | 8 ++ .../org/apache/drill/exec/ops/QueryContext.java | 7 ++ .../exec/planner/logical/RewriteProjectRel.java | 106 +++++++++++++++++++ .../exec/planner/sql/DrillOperatorTable.java | 5 + .../drill/exec/planner/sql/DrillSqlWorker.java | 3 +- .../planner/sql/handlers/DefaultSqlHandler.java | 10 +- .../java/org/apache/drill/PlanningBase.java | 4 + 7 files changed, 140 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java index f259d96..d066a00 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillFuncHolder.java @@ -291,6 +291,10 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { return ref; } + public boolean isComplexWriter() { + return isComplexWriter; + } + } public static class WorkspaceReference { @@ -319,4 +323,8 @@ public abstract class DrillFuncHolder extends AbstractFuncHolder { public MajorType getReturnType() { return returnValue.type; } + + public ValueReference getReturnValue() { + return returnValue; + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index a3e1525..be3a3ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -46,6 +47,7 @@ public class QueryContext{ private UserSession session; public final Multitimer<QuerySetup> timer; private final PlannerSettings plannerSettings; + private final DrillOperatorTable table; public QueryContext(UserSession session, QueryId queryId, DrillbitContext drllbitContext) { super(); @@ -56,6 +58,7 @@ public class QueryContext{ this.timer = new Multitimer<>(QuerySetup.class); this.plannerSettings = new PlannerSettings(session.getOptions()); this.plannerSettings.setNumEndPoints(this.getActiveEndpoints().size()); + this.table = new DrillOperatorTable(getFunctionRegistry()); } public PStoreProvider getPersistentStoreProvider(){ @@ -130,4 +133,8 @@ public class QueryContext{ return drillbitContext.getFunctionImplementationRegistry(); } + public DrillOperatorTable getDrillOperatorTable() { + return table; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RewriteProjectRel.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RewriteProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RewriteProjectRel.java new file mode 100644 index 0000000..0e4c71d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RewriteProjectRel.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.drill.exec.planner.sql.DrillOperatorTable; +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.RelShuttleImpl; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.rex.RexBuilder; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexLiteral; +import org.eigenbase.rex.RexNode; +import org.eigenbase.sql.SqlFunction; +import org.eigenbase.sql.SqlOperator; +import org.eigenbase.util.NlsString; + +import java.util.ArrayList; +import java.util.List; + +/** + * This class rewrites all the project expression that contain convert_to/ convert_from + * to actual implementations. + * Eg: convert_from(EXPR, 'JSON') is rewritten as convert_fromjson(EXPR) + * + * With the actual method name we can find out if the function has a complex + * output type and we will fire/ ignore certain rules (merge project rule) based on this fact. + */ +public class RewriteProjectRel extends RelShuttleImpl { + + RelDataTypeFactory factory; + DrillOperatorTable table; + + public RewriteProjectRel(RelDataTypeFactory factory, DrillOperatorTable table) { + super(); + this.factory = factory; + this.table = table; + } + + @Override + public RelNode visit(ProjectRel project) { + + List<RexNode> exprList = new ArrayList<>(); + boolean rewrite = false; + + for (RexNode rex : project.getChildExps()) { + RexNode newExpr = rex; + if (rex instanceof RexCall) { + RexCall function = (RexCall) rex; + String functionName = function.getOperator().getName(); + int nArgs = function.getOperands().size(); + + // check if its a convert_from or convert_to function + if (functionName.equalsIgnoreCase("convert_from") || functionName.equalsIgnoreCase("convert_to")) { + assert nArgs == 2 && function.getOperands().get(1) instanceof RexLiteral; + String literal = ((NlsString) (((RexLiteral) function.getOperands().get(1)).getValue())).getValue(); + RexBuilder builder = new RexBuilder(factory); + + // construct the new function name based on the input argument + String newFunctionName = functionName + literal; + + // Look up the new function name in the drill operator table + List<SqlOperator> operatorList = table.getSqlOperator(newFunctionName); + assert operatorList.size() > 0; + SqlFunction newFunction = null; + + // Find the SqlFunction with the correct args + for (SqlOperator op : operatorList) { + if (op.getOperandTypeChecker().getOperandCountRange().isValidCount(nArgs - 1)) { + newFunction = (SqlFunction) op; + break; + } + } + assert newFunction != null; + + // create the new expression to be used in the rewritten project + newExpr = builder.makeCall(newFunction, function.getOperands().subList(0, 1)); + rewrite = true; + } + } + exprList.add(newExpr); + } + + if (rewrite == true) { + ProjectRel newProject = project.copy(project.getTraitSet(), project.getInput(0), exprList, project.getRowType()); + return visitChild(newProject, 0, project.getChild()); + } + + return visitChild(project, 0, project.getChild()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index 9ffbb06..e34d3d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -79,4 +79,9 @@ public class DrillOperatorTable extends SqlStdOperatorTable { public List<SqlOperator> getOperatorList() { return operators; } + + // Get the list of SqlOperator's with the given name. + public List<SqlOperator> getSqlOperator(String name) { + return opMap.get(name.toLowerCase()); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index cc779ad..4cbd674 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -66,14 +66,13 @@ public class DrillSqlWorker { traitDefs.add(DrillDistributionTraitDef.INSTANCE); traitDefs.add(RelCollationTraitDef.INSTANCE); this.context = context; - DrillOperatorTable table = new DrillOperatorTable(context.getFunctionRegistry()); RelOptCostFactory costFactory = (context.getPlannerSettings().useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory() ; StdFrameworkConfig config = StdFrameworkConfig.newBuilder() // .lex(Lex.MYSQL) // .parserFactory(DrillParserWithCompoundIdConverter.FACTORY) // .defaultSchema(context.getNewDefaultSchema()) // - .operatorTable(table) // + .operatorTable(context.getDrillOperatorTable()) // .traitDefs(traitDefs) // .convertletTable(new DrillConvertletTable()) // .context(context.getPlannerSettings()) // http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 2fcdef3..a3effd9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.logical.DrillRel; import org.apache.drill.exec.planner.logical.DrillScreenRel; import org.apache.drill.exec.planner.logical.DrillStoreRel; +import org.apache.drill.exec.planner.logical.RewriteProjectRel; import org.apache.drill.exec.planner.physical.DrillDistributionTrait; import org.apache.drill.exec.planner.physical.PhysicalPlanCreator; import org.apache.drill.exec.planner.physical.PlannerSettings; @@ -50,11 +51,13 @@ import org.apache.drill.exec.planner.physical.visitor.JoinPrelRenameVisitor; import org.apache.drill.exec.planner.physical.visitor.ProducerConsumerPrelVisitor; import org.apache.drill.exec.planner.physical.visitor.RelUniqifier; import org.apache.drill.exec.planner.physical.visitor.SelectionVectorPrelVisitor; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.util.Pointer; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptUtil; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexBuilder; import org.eigenbase.sql.SqlExplainLevel; import org.eigenbase.sql.SqlNode; @@ -112,6 +115,12 @@ public class DefaultSqlHandler extends AbstractSqlHandler { SqlNode rewrittenSqlNode = rewrite(sqlNode); SqlNode validated = validateNode(rewrittenSqlNode); RelNode rel = convertToRel(validated); + + /* Traverse the tree and replace the convert_from, convert_to function to actual implementations + * Eg: convert_from(EXPR, 'JSON') be converted to convert_fromjson(EXPR); + * TODO: Ideally all function rewrites would move here instead of DrillOptiq + */ + rel = rel.accept(new RewriteProjectRel(planner.getTypeFactory(), context.getDrillOperatorTable())); log("Optiq Logical", rel); DrillRel drel = convertToDrel(rel); log("Drill Logical", drel); @@ -120,7 +129,6 @@ public class DefaultSqlHandler extends AbstractSqlHandler { PhysicalOperator pop = convertToPop(prel); PhysicalPlan plan = convertToPlan(pop); log("Drill Plan", plan); - return plan; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3c2402c9/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java index 741323b..3e0dc25 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanningBase.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.user.UserSession; @@ -99,6 +100,7 @@ public class PlanningBase extends ExecTest{ final StoragePluginRegistry registry = new StoragePluginRegistry(dbContext); registry.init(); final FunctionImplementationRegistry functionRegistry = new FunctionImplementationRegistry(config); + final DrillOperatorTable table = new DrillOperatorTable(functionRegistry); final SchemaPlus root = Frameworks.createRootSchema(false); registry.getSchemaFactory().registerSchemas(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), root); @@ -125,6 +127,8 @@ public class PlanningBase extends ExecTest{ result = config; context.getCache(); result = cache; + context.getDrillOperatorTable(); + result = table; } };
