This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 23ba50b [BEAM-10943] Add builtin functions in the Calcite planner. new 3a9591f Merge pull request #14222 from ibzib/BEAM-10943-builtins 23ba50b is described below commit 23ba50b1d1d6d7d772888416c0fbb5ba32c88de6 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Mar 12 14:19:37 2021 -0800 [BEAM-10943] Add builtin functions in the Calcite planner. This prevents the builtin functions, which aren't used in ZetaSQL, from being added to the schema when using the ZetaSQL planner. --- .../beam/sdk/extensions/sql/SqlTransform.java | 1 - .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 37 ++-------------------- .../extensions/sql/impl/CalciteQueryPlanner.java | 15 +++++++++ 3 files changed, 18 insertions(+), 35 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java index e7d49dc..a73cfac 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java @@ -143,7 +143,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(metaTableProvider); // TODO: validate duplicate functions. - sqlEnvBuilder.autoLoadBuiltinFunctions(); registerFunctions(sqlEnvBuilder); // Load automatic table providers before user ones so the user ones will cause a conflict if diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index cbe3224..848135b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -19,13 +19,11 @@ package org.apache.beam.sdk.extensions.sql.impl; import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull; -import java.lang.reflect.Method; import java.sql.SQLException; import java.util.AbstractMap.SimpleEntry; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; @@ -35,7 +33,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlUdf; import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters; import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; @@ -148,7 +145,6 @@ public class BeamSqlEnv { private String currentSchemaName; private Map<String, TableProvider> schemaMap; private Set<Map.Entry<String, Function>> functionSet; - private boolean autoLoadBuiltinFunctions; private boolean autoLoadUdfs; private PipelineOptions pipelineOptions; private Collection<RuleSet> ruleSets; @@ -161,7 +157,6 @@ public class BeamSqlEnv { schemaMap = new HashMap<>(); functionSet = new HashSet<>(); autoLoadUdfs = false; - autoLoadBuiltinFunctions = false; pipelineOptions = null; ruleSets = BeamRuleSets.getRuleSets(); } @@ -219,12 +214,6 @@ public class BeamSqlEnv { return this; } - /** Load Beam SQL built-in functions defined in {@link BeamBuiltinFunctionProvider}. */ - public BeamSqlEnvBuilder autoLoadBuiltinFunctions() { - autoLoadBuiltinFunctions = true; - return this; - } - public BeamSqlEnvBuilder setQueryPlannerClassName(String name) { queryPlannerClassName = name; return this; @@ -247,14 +236,13 @@ public class BeamSqlEnv { configureSchemas(jdbcConnection); - loadBeamBuiltinFunctions(); + QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets); + // The planner may choose to add its own builtin functions to the schema, so load user-defined + // functions second, in case there's a conflict. loadUdfs(); - addUdfsUdafs(jdbcConnection); - QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets); - return new BeamSqlEnv(jdbcConnection, planner); } @@ -275,25 +263,6 @@ public class BeamSqlEnv { } } - private void loadBeamBuiltinFunctions() { - if (!autoLoadBuiltinFunctions) { - return; - } - - for (BeamBuiltinFunctionProvider provider : - ServiceLoader.load(BeamBuiltinFunctionProvider.class)) { - loadBuiltinUdf(provider.getBuiltinMethods()); - } - } - - private void loadBuiltinUdf(Map<String, List<Method>> methods) { - for (Map.Entry<String, List<Method>> entry : methods.entrySet()) { - for (Method method : entry.getValue()) { - functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method))); - } - } - } - private void loadUdfs() { if (!autoLoadUdfs) { return; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java index b24b973..ff5deb4 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java @@ -17,9 +17,11 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import java.lang.reflect.Method; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.ServiceLoader; import java.util.stream.Collectors; import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.Factory; import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind; @@ -27,6 +29,7 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; +import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider; import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfig; import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema; @@ -92,8 +95,20 @@ public class CalciteQueryPlanner implements QueryPlanner { @Override public QueryPlanner createPlanner( JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) { + loadBuiltinFunctions(jdbcConnection); return new CalciteQueryPlanner(jdbcConnection, ruleSets); } + + private void loadBuiltinFunctions(JdbcConnection jdbcConnection) { + for (BeamBuiltinFunctionProvider provider : + ServiceLoader.load(BeamBuiltinFunctionProvider.class)) { + for (Map.Entry<String, List<Method>> entry : provider.getBuiltinMethods().entrySet()) { + for (Method method : entry.getValue()) { + jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method)); + } + } + } + } }; public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) {