In this case, the user is attempting to handle errors when parsing the timestamp. The timestamp controls the watermark for the UnboundedSource, how would they control the watermark in a downstream ParDo?
On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote: > On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com> > wrote: > >> Ah nice. Yeah, if user can return full bytes instead of applying a >> function that would result in an exception, this can be extracted by a >> ParDo down the line. >> > > KafkaIO does return bytes, and I think most sources should, unless there > is a good reason not to. > Given that, do we think Beam should provide a tranform that makes to > simpler to handle deadletter output? I think there was a thread about it in > the past. > > >> >> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com> >> wrote: >> >>> As Raghu said, >>> >>> Just apply a regular ParDo and return a PCollectionTuple afert that you >>> can extract your Success Records (TupleTag) and your DeadLetter >>> records(TupleTag) and do whatever you want with them. >>> >>> >>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18: >>> >>>> User can read serialized bytes from KafkaIO and deserialize explicitly >>>> in a ParDo, which gives complete control on how to handle record errors. >>>> This is I would do if I need to in my pipeline. >>>> >>>> If there is a transform in Beam that does this, it could be convenient >>>> for users in many such scenarios. This is simpler than each source >>>> supporting it explicitly. >>>> >>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> Given that KafkaIO uses UnboundeSource framework, this is probably not >>>>> something that can easily be supported. We might be able to support >>>>> similar >>>>> features when we have Kafka on top of Splittable DoFn though. >>>>> >>>> So feel free to create a feature request JIRA for this. >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> >>>>> wrote: >>>>> >>>>>> This is a great question. I've added the dev list to be sure it gets >>>>>> noticed by whoever may know best. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses >>>>>>> a KafkaIO >>>>>>> connector for it's input? As >>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>> only a SerializableFunction and not a ParDo, how would I be able to >>>>>>> produce a >>>>>>> Deadletter output from it? >>>>>>> >>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>> input: >>>>>>> >>>>>>> pipeline.apply( >>>>>>> KafkaIO.<String, String>read() >>>>>>> .withBootstrapServers(bootstrap) >>>>>>> .withTopics(topics) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>> .updateConsumerProperties( >>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>> inputMessagesConfig)) >>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>> "earliest")) >>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>> "beam-consumers")) >>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>> "true")) >>>>>>> .withTimestampPolicyFactory( >>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>> .withReadCommitted() >>>>>>> .commitOffsetsInFinalize()) >>>>>>> >>>>>>> >>>>>>> and I like to get deadletter outputs when my timestamp extraction >>>>>>> fails. >>>>>>> >>>>>>> Best, >>>>>>> Tobi >>>>>>> >>>>>>>