[ 
https://issues.apache.org/jira/browse/BEAM-5644?focusedWorklogId=195335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195335
 ]

ASF GitHub Bot logged work on BEAM-5644:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Feb/19 20:25
            Start Date: 06/Feb/19 20:25
    Worklog Time Spent: 10m 
      Work Description: amaliujia commented on pull request #7745: [BEAM-5644] 
make Planner configurable
URL: https://github.com/apache/beam/pull/7745#discussion_r254437306
 
 

 ##########
 File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
 ##########
 @@ -174,8 +106,110 @@ public void executeDdl(String sqlStatement) throws 
ParseException {
   public String explain(String sqlString) throws ParseException {
     try {
       return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
-    } catch (ValidationException | RelConversionException | SqlParseException 
e) {
+    } catch (Exception e) {
       throw new ParseException("Unable to parse statement", e);
     }
   }
+
+  /** BeamSqlEnv's Builder. */
+  public static class BeamSqlEnvBuilder {
+    private final JdbcConnection jdbcConnection;
+    private String plannerName = "CalcitePlanner";
+
+    public static BeamSqlEnvBuilder builder(TableProvider tableProvider) {
+      return new BeamSqlEnvBuilder(tableProvider);
+    }
+
+    private BeamSqlEnvBuilder(TableProvider tableProvider) {
+      jdbcConnection = JdbcDriver.connect(tableProvider);
+    }
+
+    public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> 
methods) {
+      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+        for (Method method : entry.getValue()) {
+          jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(), 
UdfImpl.create(method));
+        }
+      }
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder addSchema(String name, TableProvider 
tableProvider) {
+      jdbcConnection.setSchema(name, tableProvider);
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder setCurrentSchema(String name) {
+      try {
+        jdbcConnection.setSchema(name);
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
+      }
+
+      return this;
+    }
+
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, 
String method) {
+      jdbcConnection.getCurrentSchemaPlus().add(functionName, 
UdfImpl.create(clazz, method));
+
+      return this;
+    }
+
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends 
BeamSqlUdf> clazz) {
+      return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+    }
+
+    public BeamSqlEnvBuilder registerUdf(String functionName, 
SerializableFunction sfn) {
+      return registerUdf(functionName, sfn.getClass(), "apply");
+    }
+
+    /**
+     * Register a UDAF function which can be used in GROUP-BY expression. See 
{@link
+     * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a 
UDAF.
+     */
+    public BeamSqlEnvBuilder registerUdaf(String functionName, 
Combine.CombineFn combineFn) {
+      jdbcConnection.getCurrentSchemaPlus().add(functionName, new 
UdafImpl(combineFn));
+
+      return this;
+    }
+
+    /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
+    public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
+      ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
+          .forEach(
+              ins -> {
+                ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> 
registerUdf(udfName, udfClass));
+                ins.getSerializableFunctionUdfs()
+                    .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
+                ins.getUdafs().forEach((udafName, udafFn) -> 
registerUdaf(udafName, udafFn));
+              });
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
+      for (BeamBuiltinFunctionProvider provider :
+          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+        registerBuiltinUdf(provider.getBuiltinMethods());
+      }
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder setPlannerName(String name) {
+      plannerName = name;
+      return this;
+    }
+
+    public BeamSqlEnv build() {
+      if (plannerName.equals("CalcitePlanner")) {
 
 Review comment:
   I am not using reflection because different planner needs different 
initialization. Some other planners might not need FrameworksConfig. Therefore 
for known planners, it's better to construct them explicitly (otherwise what 
constructor we expect from reflection).
   
   Allows separate planner be registered by reflection makes sense if there is 
an immediate use case (someone want register new planner in classpath and they 
know what constructor they have).
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 195335)
    Time Spent: 7.5h  (was: 7h 20m)

> make Planner configurable 
> --------------------------
>
>                 Key: BEAM-5644
>                 URL: https://issues.apache.org/jira/browse/BEAM-5644
>             Project: Beam
>          Issue Type: New Feature
>          Components: dsl-sql
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>          Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> We can make planner configurable here: 
> [BeamQueryPlanner.java#L145|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java#L145]
>  
> By doing so, we can have different planner implementation to support 
> different SQL dialect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to