yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799217964


##########
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##########
@@ -85,11 +80,45 @@ public DataStream<Event> translatePostTransform(
             }
         }
         postTransformFunctionBuilder.addTimezone(timezone);
+
+        List<UdfDef> allFunctions = new ArrayList<>(udfFunctions);
+        allFunctions.addAll(convertModelsToUdfs(models));
+
         postTransformFunctionBuilder.addUdfFunctions(
-                udfFunctions.stream()
-                        .map(udf -> Tuple2.of(udf.getName(), 
udf.getClasspath()))
-                        .collect(Collectors.toList()));
+                
allFunctions.stream().map(this::udfDefToTuple2).collect(Collectors.toList()));
         return input.transform(
                 "Transform:Data", new EventTypeInfo(), 
postTransformFunctionBuilder.build());
     }
+
+    private List<UdfDef> convertModelsToUdfs(List<ModelDef> models) {
+        return 
models.stream().map(this::modelToUdf).collect(Collectors.toList());
+    }

Review Comment:
   This conversion is quite worrying, since obviously `ModelDef` carries more 
information (the params), and the conversion could not be done without some 
hacky approaches like pasting params into the name field.
   
   Modifying the `addUdfFunctions` function, making it accepting a more 
universal data structure (instead of `Name - Classpath` `Tuple2`) makes more 
sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to