[ 
https://issues.apache.org/jira/browse/KAFKA-9673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9673.
----------------------------------
    Fix Version/s: 2.6.0
         Reviewer: Konstantine Karantasis
       Resolution: Fixed

KIP-585 was approved by the 2.6.0 KIP freeze, and the PR was approved and 
merged to `trunk` before 2.6.0 feature freeze.

> Conditionally apply SMTs
> ------------------------
>
>                 Key: KAFKA-9673
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9673
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Tom Bentley
>            Assignee: Tom Bentley
>            Priority: Major
>             Fix For: 2.6.0
>
>
> KAFKA-7052 ended up using IAE with a message, rather than NPE in the case of 
> a SMT being applied to a record lacking a given field. It's still not 
> possible to apply a SMT conditionally, which is what things like Debezium 
> really need in order to apply transformations only to non-schema change 
> events.
> [~rhauch] suggested a mechanism to conditionally apply any SMT but was 
> concerned about the possibility of a naming collision (assuming it was 
> configured by a simple config)
> I'd like to propose something which would solve this problem without the 
> possibility of such collisions. The idea is to have a higher-level condition, 
> which applies an arbitrary transformation (or transformation chain) according 
> to some predicate on the record. 
> More concretely, it might be configured like this:
> {noformat}
>   transforms.conditionalExtract.type: Conditional
>   transforms.conditionalExtract.transforms: extractInt
>   transforms.conditionalExtract.transforms.extractInt.type: 
> org.apache.kafka.connect.transforms.ExtractField$Key
>   transforms.conditionalExtract.transforms.extractInt.field: c1
>   transforms.conditionalExtract.condition: topic-matches:<someRegexHere>
> {noformat}
> * The {{Conditional}} SMT is configured with its own list of transforms 
> ({{transforms.conditionalExtract.transforms}}) to apply. This would work just 
> like the top level {{transforms}} config, so subkeys can be used to configure 
> these transforms in the usual way.
> * The {{condition}} config defines the predicate for when the transforms are 
> applied to a record using a {{<condition-type>:<parameters>}} syntax
> We could initially support three condition types:
> *{{topic-matches:<pattern>}}* The transformation would be applied if the 
> record's topic name matched the given regular expression pattern. For 
> example, the following would apply the transformation on records being sent 
> to any topic with a name beginning with "my-prefix-":
> {noformat}
>        transforms.conditionalExtract.condition: topic-matches:my-prefix-.*
> {noformat}
>    
> *{{has-header:<header-name>}}* The transformation would be applied if the 
> record had at least one header with the given name. For example, the 
> following will apply the transformation on records with at least one header 
> with the name "my-header":
> {noformat}
>        transforms.conditionalExtract.condition: has-header:my-header
> {noformat}
>    
> *{{not:<condition-name>}}* This would negate the result of another named 
> condition using the condition config prefix. For example, the following will 
> apply the transformation on records which lack any header with the name 
> my-header:
> {noformat}
>       transforms.conditionalExtract.condition: not:hasMyHeader
>       transforms.conditionalExtract.condition.hasMyHeader: 
> has-header:my-header
> {noformat}
> I foresee one implementation concern with this approach, which is that 
> currently {{Transformation}} has to return a fixed {{ConfigDef}}, and this 
> proposal would require something more flexible in order to allow the config 
> parameters to depend on the listed transform aliases (and similarly for named 
> predicate used for the {{not:}} predicate). I think this could be done by 
> adding a {{default}} method to {{Transformation}} for getting the ConfigDef 
> given the config, for example.
> Obviously this would require a KIP, but before I spend any more time on this 
> I'd be interested in your thoughts [~rhauch], [~rmoff], [~gunnar.morling].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to