This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5127734 [BEAM-10925] Add rule to replace Calc with BeamCalcRel for ZetaSQL UDFs. new a497ff2 Merge pull request #13841 from ibzib/calc-rule 5127734 is described below commit 5127734d44e83d82776f49219bb662656e388b5b Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Jan 29 13:12:23 2021 -0800 [BEAM-10925] Add rule to replace Calc with BeamCalcRel for ZetaSQL UDFs. --- ...taSqlCalcRule.java => BeamJavaUdfCalcRule.java} | 15 ++++---- .../sql/zetasql/BeamZetaSqlCalcRule.java | 2 +- .../sdk/extensions/sql/zetasql/SqlAnalyzer.java | 3 ++ .../sql/zetasql/ZetaSQLQueryPlanner.java | 41 +++++++++++++++++++++- 4 files changed, 52 insertions(+), 9 deletions(-) diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java similarity index 82% copy from sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java copy to sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java index 2e7ea0f..23d0f76 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.zetasql; +import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule; @@ -26,18 +27,18 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.Con import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc; -/** A {@code ConverterRule} to replace {@link Calc} with {@link BeamZetaSqlCalcRel}. */ -public class BeamZetaSqlCalcRule extends ConverterRule { - public static final BeamZetaSqlCalcRule INSTANCE = new BeamZetaSqlCalcRule(); +/** {@link ConverterRule} to replace {@link Calc} with {@link BeamCalcRel}. */ +public class BeamJavaUdfCalcRule extends ConverterRule { + public static final BeamJavaUdfCalcRule INSTANCE = new BeamJavaUdfCalcRule(); - private BeamZetaSqlCalcRule() { + private BeamJavaUdfCalcRule() { super( - LogicalCalc.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamZetaSqlCalcRule"); + LogicalCalc.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamJavaUdfCalcRule"); } @Override public boolean matches(RelOptRuleCall x) { - return true; + return ZetaSQLQueryPlanner.hasUdfInProjects(x); } @Override @@ -45,7 +46,7 @@ public class BeamZetaSqlCalcRule extends ConverterRule { final Calc calc = (Calc) rel; final RelNode input = calc.getInput(); - return new BeamZetaSqlCalcRel( + return new BeamCalcRel( calc.getCluster(), calc.getTraitSet().replace(BeamLogicalConvention.INSTANCE), RelOptRule.convert(input, input.getTraitSet().replace(BeamLogicalConvention.INSTANCE)), diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java index 2e7ea0f..2f6c60d 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRule.java @@ -37,7 +37,7 @@ public class BeamZetaSqlCalcRule extends ConverterRule { @Override public boolean matches(RelOptRuleCall x) { - return true; + return !ZetaSQLQueryPlanner.hasUdfInProjects(x); } @Override diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java index f4db1f1..4889183 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java @@ -78,6 +78,9 @@ public class SqlAnalyzer { */ public static final String ZETASQL_FUNCTION_GROUP_NAME = "ZetaSQL"; + public static final String USER_DEFINED_JAVA_SCALAR_FUNCTIONS = + "user_defined_java_scalar_functions"; + private static final ImmutableSet<ResolvedNodeKind> SUPPORTED_STATEMENT_KINDS = ImmutableSet.of( RESOLVED_QUERY_STMT, RESOLVED_CREATE_FUNCTION_STMT, RESOLVED_CREATE_TABLE_FUNCTION_STMT); diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java index b943ab3..9ca5e83 100644 --- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java +++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java @@ -36,22 +36,29 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUncollectRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamUnnestRule; +import org.apache.beam.sdk.extensions.sql.zetasql.translation.ZetaSqlScalarFunctionImpl; import org.apache.beam.sdk.extensions.sql.zetasql.unnest.BeamZetaSqlUncollectRule; import org.apache.beam.sdk.extensions.sql.zetasql.unnest.BeamZetaSqlUnnestRule; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfig; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.ConventionTraitDef; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptUtil; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitDef; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelRoot; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.ChainedRelMetadataProvider; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.JoinCommuteRule; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperatorTable; @@ -59,11 +66,14 @@ import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.fun.SqlStdO import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParser; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.parser.SqlParserImplFactory; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.util.ChainedSqlOperatorTable; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.FrameworkConfig; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.Frameworks; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** ZetaSQLQueryPlanner. */ @SuppressWarnings({ @@ -71,6 +81,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class ZetaSQLQueryPlanner implements QueryPlanner { + private static final Logger LOG = LoggerFactory.getLogger(ZetaSQLQueryPlanner.class); + private final ZetaSQLPlannerImpl plannerImpl; public ZetaSQLQueryPlanner(FrameworkConfig config) { @@ -104,6 +116,30 @@ public class ZetaSQLQueryPlanner implements QueryPlanner { return modifyRuleSetsForZetaSql(BeamRuleSets.getRuleSets()); } + /** Returns true if the argument contains any user-defined Java functions. */ + static boolean hasUdfInProjects(RelOptRuleCall x) { + List<RelNode> resList = x.getRelList(); + for (RelNode relNode : resList) { + if (relNode instanceof LogicalCalc) { + LogicalCalc logicalCalc = (LogicalCalc) relNode; + for (RexNode rexNode : logicalCalc.getProgram().getExprList()) { + if (rexNode instanceof RexCall) { + RexCall call = (RexCall) rexNode; + if (call.getOperator() instanceof SqlUserDefinedFunction) { + SqlUserDefinedFunction udf = (SqlUserDefinedFunction) call.op; + if (udf.function instanceof ZetaSqlScalarFunctionImpl) { + ZetaSqlScalarFunctionImpl scalarFunction = (ZetaSqlScalarFunctionImpl) udf.function; + return scalarFunction.functionGroup.equals( + SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS); + } + } + } + } + } + } + return false; + } + private static Collection<RuleSet> modifyRuleSetsForZetaSql(Collection<RuleSet> ruleSets) { ImmutableList.Builder<RuleSet> ret = ImmutableList.builder(); for (RuleSet ruleSet : ruleSets) { @@ -123,6 +159,7 @@ public class ZetaSQLQueryPlanner implements QueryPlanner { continue; } else if (rule instanceof BeamCalcRule) { bd.add(BeamZetaSqlCalcRule.INSTANCE); + bd.add(BeamJavaUdfCalcRule.INSTANCE); } else if (rule instanceof BeamUnnestRule) { bd.add(BeamZetaSqlUnnestRule.INSTANCE); } else if (rule instanceof BeamUncollectRule) { @@ -196,7 +233,9 @@ public class ZetaSQLQueryPlanner implements QueryPlanner { RelMetadataQuery.THREAD_PROVIDERS.set( JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider())); root.rel.getCluster().invalidateMetadataQuery(); - return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel); + BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel); + LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode)); + return beamRelNode; } private static FrameworkConfig defaultConfig(