Yes, state in timestamp policy but simple, not like ParDo state. TimestampPolicy appears to be long lived instance. Take an inspiration from already existing policies, e.g. like the one here: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L135
On Mon, Oct 29, 2018 at 11:57 AM Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: > Sorry for not replying, I was sick with a flu. > > On Thu, Oct 25, 2018 at 9:56 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> what I ended up doing, when I could not for any reasono rely on kafka >> timestamps, but need to parse them form message is: >> >> * have a cusom kafka deserializer which never throws but returns message >> which is either a success with parsed data structure plus timestamp or >> failure with original kafka bytes payload >> * timestamp policy than can extract timestamp in case of a success >> deserialize result and in case of failure result, I am returning timestamp >> of a last success message ( in my case messages are not terribly out of >> order and failures are rather rare ) >> > > It's possible that my stream contains data that is very old (when > rewinding a topic, lets say it goes back to 2012). If I get the logic here > correctly this means I need to remember the last successfully parsed > timestamp. Do you solve this via StatefulProcessing? > > >> * following ParDo then side output failures to dead letters >> >> > > >> On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi <rang...@google.com> wrote: >>> >>>> Well, if every input record's timestamp is X, watermark staying at X is >>>> the right answer, no? But I am not sure where the disagreement is, >>>> actually. I might be mistaken. >>>> >>>> KafkaIO has a few in-built policies for watermark and timestamp that >>>> cover most use cases (including server time, which has a benefit of >>>> providing perfect watermark). It also gives fairly complete control on >>>> these to users if they chose to. I think it looks like reasonable for a >>>> policy to base its watermark only only on parsable records, and ignore >>>> unparsable records w.r.t watermark calculation. >>>> >>> >>> But then doesn't that force the user to set max allowed lateness to >>> infinity, otherwise these records will be dropped? >>> >>> It could even assign a timestamp that makes more logical sense in a >>>> particular application. >>>> >>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> Forgive me if this is naive or missing something, but here are my >>>>> thoughts on these alternatives: >>>>> >>>>> (0) Timestamp has to be pulled out in the source to control the >>>>> watermark. Luke's point is imortant. >>>>> >>>>> (1) If bad records get min_timestamp, and they occur infrequently >>>>> enough, then watermark will advance and they will all be dropped. That >>>>> will >>>>> not allow output to a dead-letter queue. >>>>> >>>>> (2) If you have always min_timestamp records, or if bad records are >>>>> frequent, the watermark will never advance. So windows/aggregations would >>>>> never be considered complete. Triggers could be used to get output anyhow, >>>>> but it would never be a final answer. I think it is not in the spirit of >>>>> Beam to work this way. Pragmatically, no state could ever be freed by a >>>>> runner. >>>>> >>>>> In SQL there is an actual "dead letter" option when creating a table >>>>> that parses from a bytes source. If, for example, a JSON record cannot be >>>>> parsed to the expected schema - like maybe an avro record got in the >>>>> stream, or the JSON doesn't match the expected schema - it is output as-is >>>>> to a user-specified dead letter queue. I think this same level of support >>>>> is also required for records that cannot have timestamps extracted in an >>>>> unbounded source. >>>>> >>>>> In an SDF I think the function has enough control to do it all in >>>>> "userland", so Cham is right on here. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> That depends on the users pipeline and how watermark advancement of >>>>>> the source may impact elements becoming droppably late if they are >>>>>> emitted >>>>>> with the minimum timestamp. >>>>>> >>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <rang...@google.com> >>>>>> wrote: >>>>>> >>>>>>> I see. >>>>>>> >>>>>>> What I meant was to return min_timestamp for bad records in the >>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for >>>>>>> parsable records. That should work too, right? >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <lc...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Yes, that would be fine. >>>>>>>> >>>>>>>> The user could then use a ParDo which outputs to a DLQ for things >>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for >>>>>>>> everything else. >>>>>>>> >>>>>>>> 1: >>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <rang...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks. So returning min timestamp is OK, right (assuming >>>>>>>>> application fine is with what it means)? >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> All records in Apache Beam have a timestamp. The default >>>>>>>>>> timestamp is the min timestamp defined here: >>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <rang...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <lc...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> You would have to return min timestamp for all records >>>>>>>>>>>> otherwise the watermark may have advanced and you would be >>>>>>>>>>>> outputting >>>>>>>>>>>> records that are droppably late. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record >>>>>>>>>>> that doesn’t have one? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi < >>>>>>>>>>>> rang...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records >>>>>>>>>>>>> shound not affect the watermark. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi < >>>>>>>>>>>>> rang...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>>>>>>> redirected by the ParDo after that. >>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a >>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik < >>>>>>>>>>>>>> lc...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for >>>>>>>>>>>>>>> the >>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>>>>>>> downstream ParDo? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi < >>>>>>>>>>>>>>> rang...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>>>>>>> applying a function that would result in an exception, this >>>>>>>>>>>>>>>>> can be >>>>>>>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should, >>>>>>>>>>>>>>>> unless there is a good reason not to. >>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that >>>>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there >>>>>>>>>>>>>>>> was a thread >>>>>>>>>>>>>>>> about it in the past. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>>>>>>> jcgarc...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple >>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) >>>>>>>>>>>>>>>>>> and your >>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with >>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. >>>>>>>>>>>>>>>>>> Okt. 2018, 05:18: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and >>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete >>>>>>>>>>>>>>>>>>> control on how to >>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my >>>>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could >>>>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is >>>>>>>>>>>>>>>>>>> simpler than each >>>>>>>>>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>>>>>>> chamik...@google.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this >>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. We >>>>>>>>>>>>>>>>>>>> might be able to >>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>>>>>>> k...@google.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to >>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a >>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO >>>>>>>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how >>>>>>>>>>>>>>>>>>>>>> would I be able to produce a >>>>>>>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from >>>>>>>>>>>>>>>>>>>>>> a KafkaIO input: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of(" >>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers")) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my >>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> > > -- > Tobias Kaymak > Data Engineer > > tobias.kay...@ricardo.ch > www.ricardo.ch >