ruanhang1993 commented on code in PR #3465:
URL: https://github.com/apache/flink-cdc/pull/3465#discussion_r1694814738


##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java:
##########
@@ -118,7 +119,7 @@ public void testJaninoTimestampFunction() throws 
InvocationTargetException {
         List<Object> params = Arrays.asList(epochTime);
         ExpressionEvaluator expressionEvaluator =
                 JaninoCompiler.compileExpression(
-                        JaninoCompiler.loadSystemFunction(expression),

Review Comment:
   Please add some unit tests and e2e tests about the udf.



##########
docs/content/docs/core-concept/data-pipeline.md:
##########
@@ -79,13 +79,30 @@ We could use following yaml file to define a complicated 
Data Pipeline describin
      fenodes: 127.0.0.1:8030
      username: root
      password: ""
+
+   transform:
+     - source-table: adb.web_order01
+       projection: \*, UPPER(product_name) as product_name
+       filter: id > 10 AND order_id > 100
+       description: project fields and filter
+     - source-table: adb.web_order02
+       projection: \*, UPPER(product_name) as product_name
+       filter: id > 20 AND order_id > 200
+       description: project fields and filter
+
    route:
      - source-table: app_db.orders
        sink-table: ods_db.ods_orders
      - source-table: app_db.shipments
        sink-table: ods_db.ods_shipments
      - source-table: app_db.products
-       sink-table: ods_db.ods_products  
+       sink-table: ods_db.ods_products
+
+   udf:
+     - name: substring
+       classpath: com.example.functions.scalar.SubStringFunctionClass

Review Comment:
   Please add SubStringFunctionClass's code in the doc. And describe how to 
append them in the classpath.



##########
docs/content/docs/core-concept/data-pipeline.md:
##########
@@ -79,13 +79,30 @@ We could use following yaml file to define a complicated 
Data Pipeline describin
      fenodes: 127.0.0.1:8030
      username: root
      password: ""
+
+   transform:
+     - source-table: adb.web_order01
+       projection: \*, UPPER(product_name) as product_name
+       filter: id > 10 AND order_id > 100
+       description: project fields and filter
+     - source-table: adb.web_order02
+       projection: \*, UPPER(product_name) as product_name
+       filter: id > 20 AND order_id > 200
+       description: project fields and filter
+
    route:
      - source-table: app_db.orders
        sink-table: ods_db.ods_orders
      - source-table: app_db.shipments
        sink-table: ods_db.ods_shipments
      - source-table: app_db.products
-       sink-table: ods_db.ods_products  
+       sink-table: ods_db.ods_products
+
+   udf:
+     - name: substring

Review Comment:
   Add the usage of this udf in this pipeline.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java:
##########
@@ -87,15 +96,34 @@ private static SqlParser getCalciteParser(String sql) {
                         .withLex(Lex.JAVA));
     }
 
-    private static RelNode sqlToRel(List<Column> columns, SqlNode sqlNode) {
+    private static RelNode sqlToRel(
+            List<Column> columns, SqlNode sqlNode, List<Tuple2<String, 
String>> udfs) {
         List<Column> columnsWithMetadata = 
copyFillMetadataColumn(sqlNode.toString(), columns);
         CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
+        SchemaPlus schema = rootSchema.plus();
         Map<String, Object> operand = new HashMap<>();
         operand.put("tableName", DEFAULT_TABLE);
         operand.put("columns", columnsWithMetadata);
         rootSchema.add(
                 DEFAULT_SCHEMA,
-                TransformSchemaFactory.INSTANCE.create(rootSchema.plus(), 
DEFAULT_SCHEMA, operand));
+                TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA, 
operand));
+        List<SqlFunction> udfFunctions = new ArrayList<>();
+        for (Tuple2<String, String> udf : udfs) {
+            try {
+                ScalarFunction function = 
ScalarFunctionImpl.create(Class.forName(udf.f1), udf.f0);

Review Comment:
   Could users use their existed flink sql's scalar function here? I think we 
should support the flink sql udfs to reduce the difficulty of user migration.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to