User can read serialized bytes from KafkaIO and deserialize explicitly in a
ParDo, which gives complete control on how to handle record errors. This is
I would do if I need to in my pipeline.

If there is a transform in Beam that does this, it could be convenient for
users in many such scenarios. This is simpler than each source supporting
it explicitly.

On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <chamik...@google.com>
wrote:

> Given that KafkaIO uses UnboundeSource framework, this is probably not
> something that can easily be supported. We might be able to support similar
> features when we have Kafka on top of Splittable DoFn though.
>
So feel free to create a feature request JIRA for this.
>
> Thanks,
> Cham
>
> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> wrote:
>
>> This is a great question. I've added the dev list to be sure it gets
>> noticed by whoever may know best.
>>
>> Kenn
>>
>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <tobias.kay...@ricardo.ch>
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>> KafkaIO
>>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
>>> takes
>>> only a SerializableFunction and not a ParDo, how would I be able to
>>> produce a
>>> Deadletter output from it?
>>>
>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>
>>> pipeline.apply(
>>>   KafkaIO.<String, String>read()
>>>     .withBootstrapServers(bootstrap)
>>>     .withTopics(topics)
>>>     .withKeyDeserializer(StringDeserializer.class)
>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>     .updateConsumerProperties(
>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>> inputMessagesConfig))
>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>> "earliest"))
>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>> "beam-consumers"))
>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>> "true"))
>>>     .withTimestampPolicyFactory(
>>>         TimestampPolicyFactory.withTimestampFn(
>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>     .withReadCommitted()
>>>     .commitOffsetsInFinalize())
>>>
>>>
>>> and I like to get deadletter outputs when my timestamp extraction fails.
>>>
>>> Best,
>>> Tobi
>>>
>>>

Reply via email to