I see. What I meant was to return min_timestamp for bad records in the timestamp handler passed to KafkaIO itself, and correct timestamp for parsable records. That should work too, right?
On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <lc...@google.com> wrote: > Yes, that would be fine. > > The user could then use a ParDo which outputs to a DLQ for things it can't > parse the timestamp for and use outputWithTimestamp[1] for everything else. > > 1: > https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- > > On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <rang...@google.com> wrote: > >> Thanks. So returning min timestamp is OK, right (assuming application >> fine is with what it means)? >> >> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> All records in Apache Beam have a timestamp. The default timestamp is >>> the min timestamp defined here: >>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>> >>> >>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <rang...@google.com> >>> wrote: >>> >>>> >>>> >>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> You would have to return min timestamp for all records otherwise the >>>>> watermark may have advanced and you would be outputting records that are >>>>> droppably late. >>>>> >>>> >>>> That would be fine I guess. What’s the timestamp for a record that >>>> doesn’t have one? >>>> >>>> >>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <rang...@google.com> >>>>> wrote: >>>>> >>>>>> To be clear, returning min_timestamp for unparsable records shound >>>>>> not affect the watermark. >>>>>> >>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <rang...@google.com> >>>>>> wrote: >>>>>> >>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>> redirected by the ParDo after that. >>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, >>>>>>> is this pipeline defined under kafkaio package? >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> In this case, the user is attempting to handle errors when parsing >>>>>>>> the timestamp. The timestamp controls the watermark for the >>>>>>>> UnboundedSource, how would they control the watermark in a downstream >>>>>>>> ParDo? >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying >>>>>>>>>> a function that would result in an exception, this can be extracted >>>>>>>>>> by a >>>>>>>>>> ParDo down the line. >>>>>>>>>> >>>>>>>>> >>>>>>>>> KafkaIO does return bytes, and I think most sources should, unless >>>>>>>>> there is a good reason not to. >>>>>>>>> Given that, do we think Beam should provide a tranform that makes >>>>>>>>> to simpler to handle deadletter output? I think there was a thread >>>>>>>>> about it >>>>>>>>> in the past. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>> jcgarc...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> As Raghu said, >>>>>>>>>>> >>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert >>>>>>>>>>> that you can extract your Success Records (TupleTag) and your >>>>>>>>>>> DeadLetter >>>>>>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. >>>>>>>>>>> 2018, 05:18: >>>>>>>>>>> >>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to >>>>>>>>>>>> handle record >>>>>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>>>>> >>>>>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>>>>> convenient for users in many such scenarios. This is simpler than >>>>>>>>>>>> each >>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>>>> probably not something that can easily be supported. We might be >>>>>>>>>>>>> able to >>>>>>>>>>>>> support similar features when we have Kafka on top of Splittable >>>>>>>>>>>>> DoFn >>>>>>>>>>>>> though. >>>>>>>>>>>>> >>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Cham >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> This is a great question. I've added the dev list to be sure >>>>>>>>>>>>>> it gets noticed by whoever may know best. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline >>>>>>>>>>>>>>> that uses a KafkaIO >>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be >>>>>>>>>>>>>>> able to produce a >>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>> new >>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>