On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com> wrote:
> Ah nice. Yeah, if user can return full bytes instead of applying a > function that would result in an exception, this can be extracted by a > ParDo down the line. > KafkaIO does return bytes, and I think most sources should, unless there is a good reason not to. Given that, do we think Beam should provide a tranform that makes to simpler to handle deadletter output? I think there was a thread about it in the past. > > On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com> > wrote: > >> As Raghu said, >> >> Just apply a regular ParDo and return a PCollectionTuple afert that you >> can extract your Success Records (TupleTag) and your DeadLetter >> records(TupleTag) and do whatever you want with them. >> >> >> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18: >> >>> User can read serialized bytes from KafkaIO and deserialize explicitly >>> in a ParDo, which gives complete control on how to handle record errors. >>> This is I would do if I need to in my pipeline. >>> >>> If there is a transform in Beam that does this, it could be convenient >>> for users in many such scenarios. This is simpler than each source >>> supporting it explicitly. >>> >>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> Given that KafkaIO uses UnboundeSource framework, this is probably not >>>> something that can easily be supported. We might be able to support similar >>>> features when we have Kafka on top of Splittable DoFn though. >>>> >>> So feel free to create a feature request JIRA for this. >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> wrote: >>>> >>>>> This is a great question. I've added the dev list to be sure it gets >>>>> noticed by whoever may know best. >>>>> >>>>> Kenn >>>>> >>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>> tobias.kay...@ricardo.ch> wrote: >>>>> >>>>>> >>>>>> Hi, >>>>>> >>>>>> Is there a way to get a Deadletter Output from a pipeline that uses a >>>>>> KafkaIO >>>>>> connector for it's input? As >>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>> only a SerializableFunction and not a ParDo, how would I be able to >>>>>> produce a >>>>>> Deadletter output from it? >>>>>> >>>>>> I have the following pipeline defined that reads from a KafkaIO input: >>>>>> >>>>>> pipeline.apply( >>>>>> KafkaIO.<String, String>read() >>>>>> .withBootstrapServers(bootstrap) >>>>>> .withTopics(topics) >>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>> .updateConsumerProperties( >>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>> inputMessagesConfig)) >>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>> "earliest")) >>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>> "beam-consumers")) >>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>> "true")) >>>>>> .withTimestampPolicyFactory( >>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>> .withReadCommitted() >>>>>> .commitOffsetsInFinalize()) >>>>>> >>>>>> >>>>>> and I like to get deadletter outputs when my timestamp extraction >>>>>> fails. >>>>>> >>>>>> Best, >>>>>> Tobi >>>>>> >>>>>>