[ 
https://issues.apache.org/jira/browse/BEAM-1573?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923595#comment-15923595
 ] 

Raghu Angadi commented on BEAM-1573:
------------------------------------

Thanks [~peay]. Please go ahead a send github PR, I does not need to be ready, 
partial implementation is also fine. Just mention that it is not ready.

A related issue is how we want to handle deserialization errors. Looks like 
KafkaConsumer.poll() throws KafkaException in case of derealization errors. So 
we don't know which record results in this error either. What if a runner want 
to skip the record (or a bundle) that causes such errors (not sure if any of 
the runners does this). This is not deal breaker, it is fine as long as we 
properly propagate the exception to user.

One option is to invoke the deserializer ourself inside advance(). This give 
full control on how we want to handle deserialization errors.
Another option to keep track of serialized size : We could have a wrapper 
deserializer class that stores the byte size and invokes user's deserializer. 
This also gives control on how we want to handle the exceptions.


> KafkaIO does not allow using Kafka serializers and deserializers
> ----------------------------------------------------------------
>
>                 Key: BEAM-1573
>                 URL: https://issues.apache.org/jira/browse/BEAM-1573
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>    Affects Versions: 0.4.0, 0.5.0
>            Reporter: peay
>            Assignee: Raghu Angadi
>            Priority: Minor
>             Fix For: Not applicable
>
>
> KafkaIO does not allow to override the serializer and deserializer settings 
> of the Kafka consumer and producers it uses internally. Instead, it allows to 
> set a `Coder`, and has a simple Kafka serializer/deserializer wrapper class 
> that calls the coder.
> I appreciate that allowing to use Beam coders is good and consistent with the 
> rest of the system. However, is there a reason to completely disallow to use 
> custom Kafka serializers instead?
> This is a limitation when working with an Avro schema registry for instance, 
> which requires custom serializers. One can write a `Coder` that wraps a 
> custom Kafka serializer, but that means two levels of un-necessary wrapping.
> In addition, the `Coder` abstraction is not equivalent to Kafka's 
> `Serializer` which gets the topic name as input. Using a `Coder` wrapper 
> would require duplicating the output topic setting in the argument to 
> `KafkaIO` and when building the wrapper, which is not elegant and error prone.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to