In this case, the user is attempting to handle errors when parsing the
timestamp. The timestamp controls the watermark for the UnboundedSource,
how would they control the watermark in a downstream ParDo?

On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote:

> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> Ah nice. Yeah, if user can return full bytes instead of applying a
>> function that would result in an exception,  this can be extracted by a
>> ParDo down the line.
>>
>
> KafkaIO does return bytes, and I think most sources should, unless there
> is a good reason not to.
> Given that, do we think Beam should provide a tranform that makes to
> simpler to handle deadletter output? I think there was a thread about it in
> the past.
>
>
>>
>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com>
>> wrote:
>>
>>> As Raghu said,
>>>
>>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>>> can extract your Success Records (TupleTag) and your DeadLetter
>>> records(TupleTag) and do whatever you want with them.
>>>
>>>
>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18:
>>>
>>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>>> in a ParDo, which gives complete control on how to handle record errors.
>>>> This is I would do if I need to in my pipeline.
>>>>
>>>> If there is a transform in Beam that does this, it could be convenient
>>>> for users in many such scenarios. This is simpler than each source
>>>> supporting it explicitly.
>>>>
>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>>>> something that can easily be supported. We might be able to support 
>>>>> similar
>>>>> features when we have Kafka on top of Splittable DoFn though.
>>>>>
>>>> So feel free to create a feature request JIRA for this.
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com>
>>>>> wrote:
>>>>>
>>>>>> This is a great question. I've added the dev list to be sure it gets
>>>>>> noticed by whoever may know best.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses
>>>>>>> a KafkaIO
>>>>>>> connector for it's input? As
>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>>>> produce a
>>>>>>> Deadletter output from it?
>>>>>>>
>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>> input:
>>>>>>>
>>>>>>> pipeline.apply(
>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>     .withTopics(topics)
>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>     .updateConsumerProperties(
>>>>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>> inputMessagesConfig))
>>>>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>> "earliest"))
>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>> "beam-consumers"))
>>>>>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>> "true"))
>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>     .withReadCommitted()
>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>
>>>>>>>
>>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>>> fails.
>>>>>>>
>>>>>>> Best,
>>>>>>> Tobi
>>>>>>>
>>>>>>>

Reply via email to