On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Ah nice. Yeah, if user can return full bytes instead of applying a
> function that would result in an exception,  this can be extracted by a
> ParDo down the line.
>

KafkaIO does return bytes, and I think most sources should, unless there is
a good reason not to.
Given that, do we think Beam should provide a tranform that makes to
simpler to handle deadletter output? I think there was a thread about it in
the past.


>
> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com>
> wrote:
>
>> As Raghu said,
>>
>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>> can extract your Success Records (TupleTag) and your DeadLetter
>> records(TupleTag) and do whatever you want with them.
>>
>>
>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18:
>>
>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>> in a ParDo, which gives complete control on how to handle record errors.
>>> This is I would do if I need to in my pipeline.
>>>
>>> If there is a transform in Beam that does this, it could be convenient
>>> for users in many such scenarios. This is simpler than each source
>>> supporting it explicitly.
>>>
>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>>> something that can easily be supported. We might be able to support similar
>>>> features when we have Kafka on top of Splittable DoFn though.
>>>>
>>> So feel free to create a feature request JIRA for this.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>> This is a great question. I've added the dev list to be sure it gets
>>>>> noticed by whoever may know best.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>>>>> KafkaIO
>>>>>> connector for it's input? As
>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>>> produce a
>>>>>> Deadletter output from it?
>>>>>>
>>>>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>>>>
>>>>>> pipeline.apply(
>>>>>>   KafkaIO.<String, String>read()
>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>     .withTopics(topics)
>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>     .updateConsumerProperties(
>>>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>> inputMessagesConfig))
>>>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>> "earliest"))
>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>> "beam-consumers"))
>>>>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>> "true"))
>>>>>>     .withTimestampPolicyFactory(
>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>     .withReadCommitted()
>>>>>>     .commitOffsetsInFinalize())
>>>>>>
>>>>>>
>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>> fails.
>>>>>>
>>>>>> Best,
>>>>>> Tobi
>>>>>>
>>>>>>

Reply via email to