[ 
https://issues.apache.org/jira/browse/BEAM-5644?focusedWorklogId=164461&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-164461
 ]

ASF GitHub Bot logged work on BEAM-5644:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Nov/18 18:18
            Start Date: 09/Nov/18 18:18
    Worklog Time Spent: 10m 
      Work Description: amaliujia closed pull request #6598: [BEAM-5644] make 
planner impl configurable
URL: https://github.com/apache/beam/pull/6598
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index eb32d2a9a36..bfac942c9e3 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -56,7 +56,8 @@ public void execute(String sqlString) throws ParseException {
       env.executeDdl(sqlString);
     } else {
       PipelineOptions options =
-          
BeamEnumerableConverter.createPipelineOptions(env.getPipelineOptions());
+          BeamEnumerableConverter.createPipelineOptions(
+              env.getPipelineOptionsMapFromBeamCalciteSchema());
       options.setJobName("BeamPlanCreator");
       Pipeline pipeline = Pipeline.create(options);
       BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery(sqlString));
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index ac9a6a17eba..f08589bdd26 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -26,6 +26,7 @@
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
 import org.apache.beam.sdk.transforms.Combine;
@@ -91,6 +92,9 @@
 
     registerFunctions(sqlEnv);
 
+    // overwrite BeamSQL's planner constructor.
+    
sqlEnv.setOptions(input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class));
+
     return BeamSqlRelUtils.toPCollection(input.getPipeline(), 
sqlEnv.parseQuery(queryString()));
   }
 
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlConstants.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlConstants.java
new file mode 100644
index 00000000000..968280a0d01
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlConstants.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+/** BeamSql constants. */
+public class BeamSqlConstants {
+  public static final String DEFAULT_BEAM_SQL_PLANNER_IMPL_CLASS_NAME =
+      "org.apache.calcite.prepare.PlannerImpl";
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEngine.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEngine.java
new file mode 100644
index 00000000000..e33bac82e40
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEngine.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.Planner;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The core component to handle through a SQL statement, from explain 
execution plan, to generate a
+ * Beam pipeline, and more.
+ */
+class BeamSqlEngine {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BeamSqlEngine.class);
+
+  Planner plannerImpl;
+
+  private BeamSqlEngine(Planner plannerImpl) {
+    this.plannerImpl = plannerImpl;
+  }
+
+  public static BeamSqlEngine create(Planner plannerImpl) {
+    return new BeamSqlEngine(plannerImpl);
+  }
+
+  /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
+  public SqlNode parse(String sqlStatement) throws SqlParseException {
+    SqlNode parsed;
+    try {
+      parsed = plannerImpl.parse(sqlStatement);
+    } finally {
+      plannerImpl.close();
+    }
+    return parsed;
+  }
+
+  /** It parses and validate the input query, then convert into a {@link 
BeamRelNode} tree. */
+  public BeamRelNode convertToBeamRel(String sqlStatement)
+      throws ValidationException, RelConversionException, SqlParseException, 
CannotPlanException {
+    BeamRelNode beamRelNode;
+    try {
+      SqlNode parsed = plannerImpl.parse(sqlStatement);
+      SqlNode validated = plannerImpl.validate(parsed);
+      LOG.info("SQL:\n" + validated);
+
+      // root of original logical plan
+      RelRoot root = plannerImpl.rel(validated);
+      LOG.info("SQLPlan>\n" + RelOptUtil.toString(root.rel));
+
+      RelTraitSet desiredTraits =
+          root.rel
+              .getTraitSet()
+              .replace(BeamLogicalConvention.INSTANCE)
+              .replace(root.collation)
+              .simplify();
+
+      // beam physical plan
+      beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, 
root.rel);
+      LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));
+    } finally {
+      plannerImpl.close();
+    }
+    return beamRelNode;
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 44eed399130..da9350f2e87 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -26,6 +26,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.calcite.jdbc.CalciteConnection;
@@ -36,6 +37,8 @@
 import org.apache.calcite.schema.impl.ScalarFunctionImpl;
 import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Planner;
 import org.apache.calcite.tools.RelConversionException;
 import org.apache.calcite.tools.ValidationException;
 
@@ -48,12 +51,16 @@
 public class BeamSqlEnv {
   final CalciteConnection connection;
   final SchemaPlus defaultSchema;
-  final BeamQueryPlanner planner;
+  final BeamSqlEngine sqlEngine;
+
+  private BeamSqlPipelineOptions options;
 
   private BeamSqlEnv(TableProvider tableProvider) {
     connection = JdbcDriver.connect(tableProvider);
     defaultSchema = JdbcDriver.getDefaultSchema(connection);
-    planner = new BeamQueryPlanner(connection);
+    FrameworkConfig config = CalciteFrameworkConfigFactory.create(connection);
+    Planner plannerImpl = PlannerFactory.create(config, 
getBeamSqlPipelineOptions());
+    sqlEngine = BeamSqlEngine.create(plannerImpl);
   }
 
   public static BeamSqlEnv readOnly(String tableType, Map<String, 
BeamSqlTable> tables) {
@@ -101,7 +108,7 @@ public void registerUdaf(String functionName, 
Combine.CombineFn combineFn) {
 
   public BeamRelNode parseQuery(String query) throws ParseException {
     try {
-      return planner.convertToBeamRel(query);
+      return sqlEngine.convertToBeamRel(query);
     } catch (ValidationException | RelConversionException | SqlParseException 
e) {
       throw new ParseException(String.format("Unable to parse query %s", 
query), e);
     }
@@ -109,7 +116,7 @@ public BeamRelNode parseQuery(String query) throws 
ParseException {
 
   public boolean isDdl(String sqlStatement) throws ParseException {
     try {
-      return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
+      return sqlEngine.parse(sqlStatement) instanceof SqlExecutableStatement;
     } catch (SqlParseException e) {
       throw new ParseException("Unable to parse statement", e);
     }
@@ -117,7 +124,7 @@ public boolean isDdl(String sqlStatement) throws 
ParseException {
 
   public void executeDdl(String sqlStatement) throws ParseException {
     try {
-      SqlExecutableStatement ddl = (SqlExecutableStatement) 
planner.parse(sqlStatement);
+      SqlExecutableStatement ddl = (SqlExecutableStatement) 
sqlEngine.parse(sqlStatement);
       ddl.execute(getContext());
     } catch (SqlParseException e) {
       throw new ParseException("Unable to parse DDL statement", e);
@@ -128,15 +135,27 @@ public void executeDdl(String sqlStatement) throws 
ParseException {
     return connection.createPrepareContext();
   }
 
-  public Map<String, String> getPipelineOptions() {
+  public Map<String, String> getPipelineOptionsMapFromBeamCalciteSchema() {
     return ((BeamCalciteSchema) 
CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
   }
 
   public String explain(String sqlString) throws ParseException {
     try {
-      return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
+      return RelOptUtil.toString(sqlEngine.convertToBeamRel(sqlString));
     } catch (ValidationException | RelConversionException | SqlParseException 
e) {
       throw new ParseException("Unable to parse statement", e);
     }
   }
+
+  public void setOptions(BeamSqlPipelineOptions options) {
+    this.options = options;
+  }
+
+  private BeamSqlPipelineOptions getBeamSqlPipelineOptions() {
+    if (options == null) {
+      options = 
PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
+    }
+
+    return options;
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
new file mode 100644
index 00000000000..fa9a77cf5b0
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/** Options used to configure BeamSQL. */
+public interface BeamSqlPipelineOptions extends PipelineOptions {
+
+  @Description("BeamSQL planner implementation class name.")
+  @Default.String(BeamSqlConstants.DEFAULT_BEAM_SQL_PLANNER_IMPL_CLASS_NAME)
+  String getPlannerImplClassName();
+
+  void setPlannerImplClassName(String className);
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
new file mode 100644
index 00000000000..df45b6b4d57
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+
+/** {@link AutoService} registrar for {@link BeamSqlPipelineOptions}. */
+@AutoService(PipelineOptionsRegistrar.class)
+public class BeamSqlPipelineOptionsRegistrar implements 
PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.of(BeamSqlPipelineOptions.class);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFrameworkConfigFactory.java
similarity index 50%
rename from 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
rename to 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFrameworkConfigFactory.java
index bdee34a7a96..7330253db09 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFrameworkConfigFactory.java
@@ -19,45 +19,25 @@
 
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
-import org.apache.calcite.plan.RelOptPlanner.CannotPlanException;
-import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.plan.RelTraitDef;
-import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.prepare.CalciteCatalogReader;
-import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
-import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParserImplFactory;
 import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.tools.Planner;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- * The core component to handle through a SQL statement, from explain 
execution plan, to generate a
- * Beam pipeline.
- */
-class BeamQueryPlanner {
-  private static final Logger LOG = 
LoggerFactory.getLogger(BeamQueryPlanner.class);
-
-  private final FrameworkConfig config;
-
-  BeamQueryPlanner(CalciteConnection connection) {
+/** Factory that creates {@link FrameworkConfig}. */
+class CalciteFrameworkConfigFactory {
+  public static FrameworkConfig create(CalciteConnection connection) {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -86,62 +66,15 @@
     final SqlOperatorTable opTab0 =
         connection.config().fun(SqlOperatorTable.class, 
SqlStdOperatorTable.instance());
 
-    this.config =
-        Frameworks.newConfigBuilder()
-            .parserConfig(parserConfig.build())
-            .defaultSchema(defaultSchema)
-            .traitDefs(traitDefs)
-            .context(Contexts.of(connection.config()))
-            .ruleSets(BeamRuleSets.getRuleSets())
-            .costFactory(null)
-            .typeSystem(connection.getTypeFactory().getTypeSystem())
-            .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
-            .build();
-  }
-
-  /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
-  public SqlNode parse(String sqlStatement) throws SqlParseException {
-    Planner planner = getPlanner();
-    SqlNode parsed;
-    try {
-      parsed = planner.parse(sqlStatement);
-    } finally {
-      planner.close();
-    }
-    return parsed;
-  }
-
-  /** It parses and validate the input query, then convert into a {@link 
BeamRelNode} tree. */
-  public BeamRelNode convertToBeamRel(String sqlStatement)
-      throws ValidationException, RelConversionException, SqlParseException, 
CannotPlanException {
-    BeamRelNode beamRelNode;
-    Planner planner = getPlanner();
-    try {
-      SqlNode parsed = planner.parse(sqlStatement);
-      SqlNode validated = planner.validate(parsed);
-      LOG.info("SQL:\n" + validated);
-
-      // root of original logical plan
-      RelRoot root = planner.rel(validated);
-      LOG.info("SQLPlan>\n" + RelOptUtil.toString(root.rel));
-
-      RelTraitSet desiredTraits =
-          root.rel
-              .getTraitSet()
-              .replace(BeamLogicalConvention.INSTANCE)
-              .replace(root.collation)
-              .simplify();
-
-      // beam physical plan
-      beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, 
root.rel);
-      LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));
-    } finally {
-      planner.close();
-    }
-    return beamRelNode;
-  }
-
-  private Planner getPlanner() {
-    return Frameworks.getPlanner(config);
+    return Frameworks.newConfigBuilder()
+        .parserConfig(parserConfig.build())
+        .defaultSchema(defaultSchema)
+        .traitDefs(traitDefs)
+        .context(Contexts.of(connection.config()))
+        .ruleSets(BeamRuleSets.getRuleSets())
+        .costFactory(null)
+        .typeSystem(connection.getTypeFactory().getTypeSystem())
+        .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))
+        .build();
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/PlannerFactory.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/PlannerFactory.java
new file mode 100644
index 00000000000..6f684cb9209
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/PlannerFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import org.apache.calcite.tools.FrameworkConfig;
+import org.apache.calcite.tools.Frameworks;
+import org.apache.calcite.tools.Planner;
+
+/** Factory that creates an instance of {@link Planner}. */
+class PlannerFactory {
+  public static Planner create(FrameworkConfig config, BeamSqlPipelineOptions 
options) {
+    if (options
+        .getPlannerImplClassName()
+        .equals(BeamSqlConstants.DEFAULT_BEAM_SQL_PLANNER_IMPL_CLASS_NAME)) {
+      return Frameworks.getPlanner(config);
+    } else { // using reflection to create a planner.
+      Constructor constructor = 
getPlannerConstructorFromPipelineOptions(options);
+      return constructPlanner(config, constructor);
+    }
+  }
+
+  private static Constructor getPlannerConstructorFromPipelineOptions(
+      BeamSqlPipelineOptions options) {
+    String plannerImplClassName = null;
+    try {
+      plannerImplClassName = options.getPlannerImplClassName();
+      return 
Class.forName(plannerImplClassName).getConstructor(FrameworkConfig.class);
+    } catch (NoSuchMethodException e) {
+      throw new RuntimeException(
+          "Couldn't find a constructor of \""
+              + options.getPlannerImplClassName()
+              + "\" that accepts FrameworkConfig as a parameter.",
+          e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(
+          "Couldn't find class \""
+              + options.getPlannerImplClassName()
+              + "\" that is specified in the pipeline options",
+          e);
+    }
+  }
+
+  private static Planner constructPlanner(FrameworkConfig config, Constructor 
plannerConstructor) {
+    try {
+      return (Planner) plannerConstructor.newInstance(config);
+    } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+      throw new IllegalArgumentException(
+          "Using an illegal plannerImplConstructor: " + 
plannerConstructor.toString(), e);
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrarTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrarTest.java
new file mode 100644
index 00000000000..eb6cfa2cf29
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrarTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamSqlPipelineOptionsRegistrarTest}. */
+@RunWith(JUnit4.class)
+public class BeamSqlPipelineOptionsRegistrarTest {
+
+  @Test
+  public void testServiceLoader() {
+    for (PipelineOptionsRegistrar registrar : 
ServiceLoader.load(PipelineOptionsRegistrar.class)) {
+      if (registrar instanceof BeamSqlPipelineOptionsRegistrar) {
+        assertThat(
+            registrar.getPipelineOptions(),
+            Matchers.<Class<?>>contains(BeamSqlPipelineOptions.class));
+        return;
+      }
+    }
+
+    fail("Expected to find " + BeamSqlPipelineOptionsRegistrar.class);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsTest.java
new file mode 100644
index 00000000000..c86dbc73ffa
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BeamSqlPipelineOptionsTest}. */
+@RunWith(JUnit4.class)
+public class BeamSqlPipelineOptionsTest {
+
+  @Test
+  public void testPlannerImplClassName() {
+    BeamSqlPipelineOptions options =
+        PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
+    assertEquals("org.apache.calcite.prepare.PlannerImpl", 
options.getPlannerImplClassName());
+
+    options.setPlannerImplClassName("org.apache.test");
+    assertEquals("org.apache.test", options.getPlannerImplClassName());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 164461)
    Time Spent: 4h 50m  (was: 4h 40m)

> make Planner configurable 
> --------------------------
>
>                 Key: BEAM-5644
>                 URL: https://issues.apache.org/jira/browse/BEAM-5644
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> We can make planner configurable here: 
> [BeamQueryPlanner.java#L145|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java#L145]
>  
> By doing so, we can have different planner implementation to support 
> different SQL dialect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to