what I ended up doing, when I could not for any reasono rely on kafka timestamps, but need to parse them form message is:
* have a cusom kafka deserializer which never throws but returns message which is either a success with parsed data structure plus timestamp or failure with original kafka bytes payload * timestamp policy than can extract timestamp in case of a success deserialize result and in case of failure result, I am returning timestamp of a last success message ( in my case messages are not terribly out of order and failures are rather rare ) * following ParDo then side output failures to dead letters On Thu, Oct 25, 2018 at 8:54 AM Reuven Lax <[email protected]> wrote: > > > On Wed, Oct 24, 2018, 10:26 PM Raghu Angadi <[email protected]> wrote: > >> Well, if every input record's timestamp is X, watermark staying at X is >> the right answer, no? But I am not sure where the disagreement is, >> actually. I might be mistaken. >> >> KafkaIO has a few in-built policies for watermark and timestamp that >> cover most use cases (including server time, which has a benefit of >> providing perfect watermark). It also gives fairly complete control on >> these to users if they chose to. I think it looks like reasonable for a >> policy to base its watermark only only on parsable records, and ignore >> unparsable records w.r.t watermark calculation. >> > > But then doesn't that force the user to set max allowed lateness to > infinity, otherwise these records will be dropped? > > It could even assign a timestamp that makes more logical sense in a >> particular application. >> >> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote: >> >>> Forgive me if this is naive or missing something, but here are my >>> thoughts on these alternatives: >>> >>> (0) Timestamp has to be pulled out in the source to control the >>> watermark. Luke's point is imortant. >>> >>> (1) If bad records get min_timestamp, and they occur infrequently >>> enough, then watermark will advance and they will all be dropped. That will >>> not allow output to a dead-letter queue. >>> >>> (2) If you have always min_timestamp records, or if bad records are >>> frequent, the watermark will never advance. So windows/aggregations would >>> never be considered complete. Triggers could be used to get output anyhow, >>> but it would never be a final answer. I think it is not in the spirit of >>> Beam to work this way. Pragmatically, no state could ever be freed by a >>> runner. >>> >>> In SQL there is an actual "dead letter" option when creating a table >>> that parses from a bytes source. If, for example, a JSON record cannot be >>> parsed to the expected schema - like maybe an avro record got in the >>> stream, or the JSON doesn't match the expected schema - it is output as-is >>> to a user-specified dead letter queue. I think this same level of support >>> is also required for records that cannot have timestamps extracted in an >>> unbounded source. >>> >>> In an SDF I think the function has enough control to do it all in >>> "userland", so Cham is right on here. >>> >>> Kenn >>> >>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> That depends on the users pipeline and how watermark advancement of the >>>> source may impact elements becoming droppably late if they are emitted with >>>> the minimum timestamp. >>>> >>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> I see. >>>>> >>>>> What I meant was to return min_timestamp for bad records in the >>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for >>>>> parsable records. That should work too, right? >>>>> >>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Yes, that would be fine. >>>>>> >>>>>> The user could then use a ParDo which outputs to a DLQ for things it >>>>>> can't parse the timestamp for and use outputWithTimestamp[1] for >>>>>> everything >>>>>> else. >>>>>> >>>>>> 1: >>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>>>>> >>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks. So returning min timestamp is OK, right (assuming >>>>>>> application fine is with what it means)? >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> All records in Apache Beam have a timestamp. The default timestamp >>>>>>>> is the min timestamp defined here: >>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> You would have to return min timestamp for all records otherwise >>>>>>>>>> the watermark may have advanced and you would be outputting records >>>>>>>>>> that >>>>>>>>>> are droppably late. >>>>>>>>>> >>>>>>>>> >>>>>>>>> That would be fine I guess. What’s the timestamp for a record that >>>>>>>>> doesn’t have one? >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> To be clear, returning min_timestamp for unparsable records >>>>>>>>>>> shound not affect the watermark. >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>>>>> redirected by the ParDo after that. >>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public >>>>>>>>>>>> API, is this pipeline defined under kafkaio package? >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for >>>>>>>>>>>>> the >>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>>>>> downstream ParDo? >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>>>>> applying a function that would result in an exception, this >>>>>>>>>>>>>>> can be >>>>>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should, >>>>>>>>>>>>>> unless there is a good reason not to. >>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform that >>>>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was >>>>>>>>>>>>>> a thread >>>>>>>>>>>>>> about it in the past. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple >>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) and >>>>>>>>>>>>>>>> your >>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with >>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. >>>>>>>>>>>>>>>> 2018, 05:18: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and >>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete >>>>>>>>>>>>>>>>> control on how to >>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in my >>>>>>>>>>>>>>>>> pipeline. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could >>>>>>>>>>>>>>>>> be convenient for users in many such scenarios. This is >>>>>>>>>>>>>>>>> simpler than each >>>>>>>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>>>>>>>>> probably not something that can easily be supported. We >>>>>>>>>>>>>>>>>> might be able to >>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be >>>>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a >>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO >>>>>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would >>>>>>>>>>>>>>>>>>>> I be able to produce a >>>>>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>
