[ https://issues.apache.org/jira/browse/KAFKA-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-9673. ---------------------------------- Fix Version/s: 2.6.0 Reviewer: Konstantine Karantasis Resolution: Fixed KIP-585 was approved by the 2.6.0 KIP freeze, and the PR was approved and merged to `trunk` before 2.6.0 feature freeze. > Conditionally apply SMTs > ------------------------ > > Key: KAFKA-9673 > URL: https://issues.apache.org/jira/browse/KAFKA-9673 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Tom Bentley > Assignee: Tom Bentley > Priority: Major > Fix For: 2.6.0 > > > KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of > a SMT being applied to a record lacking a given field. It's still not > possible to apply a SMT conditionally, which is what things like Debezium > really need in order to apply transformations only to non-schema change > events. > [~rhauch] suggested a mechanism to conditionally apply any SMT but was > concerned about the possibility of a naming collision (assuming it was > configured by a simple config) > I'd like to propose something which would solve this problem without the > possibility of such collisions. The idea is to have a higher-level condition, > which applies an arbitrary transformation (or transformation chain) according > to some predicate on the record. > More concretely, it might be configured like this: > {noformat} > transforms.conditionalExtract.type: Conditional > transforms.conditionalExtract.transforms: extractInt > transforms.conditionalExtract.transforms.extractInt.type: > org.apache.kafka.connect.transforms.ExtractField$Key > transforms.conditionalExtract.transforms.extractInt.field: c1 > transforms.conditionalExtract.condition: topic-matches:<someRegexHere> > {noformat} > * The {{Conditional}} SMT is configured with its own list of transforms > ({{transforms.conditionalExtract.transforms}}) to apply. This would work just > like the top level {{transforms}} config, so subkeys can be used to configure > these transforms in the usual way. > * The {{condition}} config defines the predicate for when the transforms are > applied to a record using a {{<condition-type>:<parameters>}} syntax > We could initially support three condition types: > *{{topic-matches:<pattern>}}* The transformation would be applied if the > record's topic name matched the given regular expression pattern. For > example, the following would apply the transformation on records being sent > to any topic with a name beginning with "my-prefix-": > {noformat} > transforms.conditionalExtract.condition: topic-matches:my-prefix-.* > {noformat} > > *{{has-header:<header-name>}}* The transformation would be applied if the > record had at least one header with the given name. For example, the > following will apply the transformation on records with at least one header > with the name "my-header": > {noformat} > transforms.conditionalExtract.condition: has-header:my-header > {noformat} > > *{{not:<condition-name>}}* This would negate the result of another named > condition using the condition config prefix. For example, the following will > apply the transformation on records which lack any header with the name > my-header: > {noformat} > transforms.conditionalExtract.condition: not:hasMyHeader > transforms.conditionalExtract.condition.hasMyHeader: > has-header:my-header > {noformat} > I foresee one implementation concern with this approach, which is that > currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this > proposal would require something more flexible in order to allow the config > parameters to depend on the listed transform aliases (and similarly for named > predicate used for the {{not:}} predicate). I think this could be done by > adding a {{default}} method to {{Transformation}} for getting the ConfigDef > given the config, for example. > Obviously this would require a KIP, but before I spend any more time on this > I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling]. -- This message was sent by Atlassian Jira (v8.3.4#803005)