[ 
https://issues.apache.org/jira/browse/KAFKA-6396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16317752#comment-16317752
 ] 

Ewen Cheslack-Postava commented on KAFKA-6396:
----------------------------------------------

If I'm understanding your request properly, there are a couple of problems with 
what you're proposing.

First, for transformations it would be a fundamental change to how they work. 
Today they work as SMTs: single message transforms, which means you get the 
entire message. For sink connectors we a) convert the entire message, b) 
transform the entire message, c) process the entire message in the sink 
connector. To pass the data to stage (b), it *must* have fully been processed, 
key and value, by (a).

Second, what you want to do by checking the value for `null` doesn't work 
because `null` is valid for values. Transformations only remove the record if 
*the entire record* is returned as `null`. A `null` value will be written and 
potentially used for compaction if it is contained in a surrounding record.

It would certainly be possible to write a system that worked the way you 
describe, but it requires a much more complicated processing pipeline. You need 
to define the order in which each component of the message is deserialized, 
define different transformations for each (as well as potentially a 
transformation for the entire record if you want to support functionality like 
Kafka Connect currently supports like copying data between key/value). This 
gets even more complicated when you consider all the components, not all of 
which are in Kafka Connect yet: key, value, headers, timestamp, etc.

To me, this smells like trying to fit a pretty highly optimized transformation 
pipeline into Connect simply because most of the building blocks are there to 
do so without coding. (I would consider any case where you're trying to avoid 
deserializing *parts* of a record, a pretty optimized used case.) Personally, 
I'd recommend writing a small Kafka Streams app to handle this case, where you 
can carefully select how to deserialize and process the data, and interleave 
the processing of most components of the record carefully to optimize 
performance.

> Possibly kafka-connect converter should be able to stop processing chain
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-6396
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6396
>             Project: Kafka
>          Issue Type: Wish
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Alexander Koval
>            Priority: Minor
>
> At present only transformations can discard records returning null. But I 
> think sometimes it would be nice to discard processing chain after converting 
> message. For example I have some tags shipped with a message key and I want 
> to stop processing the message after converting its key (there are a lot of 
> messages and I don't want to deserialize message values that I don't need).
> At the moment to do that I should disable converters and move message 
> deserializing to the transformation chain:
> {code}
> key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
> transforms=proto,catalog
> transforms.proto.type=company.evo.kafka.ProtobufTransformation
> transforms.proto.key.protobuf.class=company.evo.uaprom.indexator.KeyProto$KeyMessage
> transforms.proto.value.protobuf.class=company.evo.uaprom.indexator.catalog.CompanyProto$UniversalCompanyMessage
> transforms.proto.tag=catalog
> {code}
> If 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/1.0.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L453]
>  checked converted values on {{null}} it would solved my problem more 
> gracefully



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to