tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429066772



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
##########
@@ -257,12 +273,25 @@ public boolean includeRecordDetailsInErrorLog() {
         final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+
             try {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = getClass(prefix + 
"type").asSubclass(Transformation.class)
                         .getDeclaredConstructor().newInstance();
-                transformation.configure(originalsWithPrefix(prefix));
-                transformations.add(transformation);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                Object predicateAlias = configs.remove("predicate");
+                Object negate = configs.remove("negate");
+                transformation.configure(configs);
+                if (predicateAlias != null) {
+                    String predicatePrefix = "predicates." + predicateAlias + 
".";
+                    @SuppressWarnings("unchecked")
+                    Predicate<R> predicate = getClass(predicatePrefix + 
"type").asSubclass(Predicate.class)
+                            .getDeclaredConstructor().newInstance();
+                    predicate.configure(originalsWithPrefix(predicatePrefix));
+                    transformations.add(new 
PredicatedTransformation<>(predicate, negate == null ? false : 
Boolean.parseBoolean(negate.toString()), transformation));

Review comment:
       A more general question is "Why does `PredicatedTransformation` have a 
special constructor rather than use `configure(Map<String, ?>)`?" This arises 
because `PredicatedTransformer` is a bit special. In particular is has to be 
passed an already configured `Predicate` and a `Transformation`, which you 
couldn't do with normal `Transformation` (which can only be configured with the 
types supported by `ConfigDef`).
   
   I guess we could use `configure(Map<String, ?>)`, but then we have to 
instantiate a Map at this call site only to unpick it again in 
`PredicatedTransformation`, which feels like more work than just having a 
constructor. But if you prefer the consistency of using `configure()` I'm happy 
to do it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to