ruanhang1993 commented on code in PR #3465: URL: https://github.com/apache/flink-cdc/pull/3465#discussion_r1694814738
########## flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java: ########## @@ -118,7 +119,7 @@ public void testJaninoTimestampFunction() throws InvocationTargetException { List<Object> params = Arrays.asList(epochTime); ExpressionEvaluator expressionEvaluator = JaninoCompiler.compileExpression( - JaninoCompiler.loadSystemFunction(expression), Review Comment: Please add some unit tests and e2e tests about the udf. ########## docs/content/docs/core-concept/data-pipeline.md: ########## @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin fenodes: 127.0.0.1:8030 username: root password: "" + + transform: + - source-table: adb.web_order01 + projection: \*, UPPER(product_name) as product_name + filter: id > 10 AND order_id > 100 + description: project fields and filter + - source-table: adb.web_order02 + projection: \*, UPPER(product_name) as product_name + filter: id > 20 AND order_id > 200 + description: project fields and filter + route: - source-table: app_db.orders sink-table: ods_db.ods_orders - source-table: app_db.shipments sink-table: ods_db.ods_shipments - source-table: app_db.products - sink-table: ods_db.ods_products + sink-table: ods_db.ods_products + + udf: + - name: substring + classpath: com.example.functions.scalar.SubStringFunctionClass Review Comment: Please add SubStringFunctionClass's code in the doc. And describe how to append them in the classpath. ########## docs/content/docs/core-concept/data-pipeline.md: ########## @@ -79,13 +79,30 @@ We could use following yaml file to define a complicated Data Pipeline describin fenodes: 127.0.0.1:8030 username: root password: "" + + transform: + - source-table: adb.web_order01 + projection: \*, UPPER(product_name) as product_name + filter: id > 10 AND order_id > 100 + description: project fields and filter + - source-table: adb.web_order02 + projection: \*, UPPER(product_name) as product_name + filter: id > 20 AND order_id > 200 + description: project fields and filter + route: - source-table: app_db.orders sink-table: ods_db.ods_orders - source-table: app_db.shipments sink-table: ods_db.ods_shipments - source-table: app_db.products - sink-table: ods_db.ods_products + sink-table: ods_db.ods_products + + udf: + - name: substring Review Comment: Add the usage of this udf in this pipeline. ########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java: ########## @@ -87,15 +96,34 @@ private static SqlParser getCalciteParser(String sql) { .withLex(Lex.JAVA)); } - private static RelNode sqlToRel(List<Column> columns, SqlNode sqlNode) { + private static RelNode sqlToRel( + List<Column> columns, SqlNode sqlNode, List<Tuple2<String, String>> udfs) { List<Column> columnsWithMetadata = copyFillMetadataColumn(sqlNode.toString(), columns); CalciteSchema rootSchema = CalciteSchema.createRootSchema(true); + SchemaPlus schema = rootSchema.plus(); Map<String, Object> operand = new HashMap<>(); operand.put("tableName", DEFAULT_TABLE); operand.put("columns", columnsWithMetadata); rootSchema.add( DEFAULT_SCHEMA, - TransformSchemaFactory.INSTANCE.create(rootSchema.plus(), DEFAULT_SCHEMA, operand)); + TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA, operand)); + List<SqlFunction> udfFunctions = new ArrayList<>(); + for (Tuple2<String, String> udf : udfs) { + try { + ScalarFunction function = ScalarFunctionImpl.create(Class.forName(udf.f1), udf.f0); Review Comment: Could users use their existed flink sql's scalar function here? I think we should support the flink sql udfs to reduce the difficulty of user migration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org