[ https://issues.apache.org/jira/browse/BEAM-5644?focusedWorklogId=195335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-195335 ]
ASF GitHub Bot logged work on BEAM-5644: ---------------------------------------- Author: ASF GitHub Bot Created on: 06/Feb/19 20:25 Start Date: 06/Feb/19 20:25 Worklog Time Spent: 10m Work Description: amaliujia commented on pull request #7745: [BEAM-5644] make Planner configurable URL: https://github.com/apache/beam/pull/7745#discussion_r254437306 ########## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java ########## @@ -174,8 +106,110 @@ public void executeDdl(String sqlStatement) throws ParseException { public String explain(String sqlString) throws ParseException { try { return RelOptUtil.toString(planner.convertToBeamRel(sqlString)); - } catch (ValidationException | RelConversionException | SqlParseException e) { + } catch (Exception e) { throw new ParseException("Unable to parse statement", e); } } + + /** BeamSqlEnv's Builder. */ + public static class BeamSqlEnvBuilder { + private final JdbcConnection jdbcConnection; + private String plannerName = "CalcitePlanner"; + + public static BeamSqlEnvBuilder builder(TableProvider tableProvider) { + return new BeamSqlEnvBuilder(tableProvider); + } + + private BeamSqlEnvBuilder(TableProvider tableProvider) { + jdbcConnection = JdbcDriver.connect(tableProvider); + } + + public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> methods) { + for (Map.Entry<String, List<Method>> entry : methods.entrySet()) { + for (Method method : entry.getValue()) { + jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method)); + } + } + + return this; + } + + public BeamSqlEnvBuilder addSchema(String name, TableProvider tableProvider) { + jdbcConnection.setSchema(name, tableProvider); + + return this; + } + + public BeamSqlEnvBuilder setCurrentSchema(String name) { + try { + jdbcConnection.setSchema(name); + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return this; + } + + /** Register a UDF function which can be used in SQL expression. */ + public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, String method) { + jdbcConnection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, method)); + + return this; + } + + /** Register a UDF function which can be used in SQL expression. */ + public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) { + return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD); + } + + public BeamSqlEnvBuilder registerUdf(String functionName, SerializableFunction sfn) { + return registerUdf(functionName, sfn.getClass(), "apply"); + } + + /** + * Register a UDAF function which can be used in GROUP-BY expression. See {@link + * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF. + */ + public BeamSqlEnvBuilder registerUdaf(String functionName, Combine.CombineFn combineFn) { + jdbcConnection.getCurrentSchemaPlus().add(functionName, new UdafImpl(combineFn)); + + return this; + } + + /** Load all UDF/UDAF from {@link UdfUdafProvider}. */ + public BeamSqlEnvBuilder loadUdfUdafFromProvider() { + ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class) + .forEach( + ins -> { + ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass)); + ins.getSerializableFunctionUdfs() + .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn)); + ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn)); + }); + + return this; + } + + public BeamSqlEnvBuilder loadBeamBuiltinFunctions() { + for (BeamBuiltinFunctionProvider provider : + ServiceLoader.load(BeamBuiltinFunctionProvider.class)) { + registerBuiltinUdf(provider.getBuiltinMethods()); + } + + return this; + } + + public BeamSqlEnvBuilder setPlannerName(String name) { + plannerName = name; + return this; + } + + public BeamSqlEnv build() { + if (plannerName.equals("CalcitePlanner")) { Review comment: I am not using reflection because different planner needs different initialization. Some other planners might not need FrameworksConfig. Therefore for known planners, it's better to construct them explicitly (otherwise what constructor we expect from reflection). Allows separate planner be registered by reflection makes sense if there is an immediate use case (someone want register new planner in classpath and they know what constructor they have). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 195335) Time Spent: 7.5h (was: 7h 20m) > make Planner configurable > -------------------------- > > Key: BEAM-5644 > URL: https://issues.apache.org/jira/browse/BEAM-5644 > Project: Beam > Issue Type: New Feature > Components: dsl-sql > Reporter: Rui Wang > Assignee: Rui Wang > Priority: Major > Time Spent: 7.5h > Remaining Estimate: 0h > > We can make planner configurable here: > [BeamQueryPlanner.java#L145|https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java#L145] > > By doing so, we can have different planner implementation to support > different SQL dialect. -- This message was sent by Atlassian JIRA (v7.6.3#76005)