Hi, I have filed an issue: https://github.com/apache/beam/issues/28760 I have also created a PR (based of our local fix for this): https://github.com/apache/beam/pull/28763 This can serve as a start.
Thanks Sachin On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin <p.o.solo...@gmail.com> wrote: > Hello, sorry for the late reply. > > EFOKinesisReader implemented the same logic of timestamps non-EFO > KinesisReader had. At the time of EFO implementation more careful > evaluation of the records' timestamps was out of context. > > Can you please create an issue at https://github.com/apache/beam/issues ? > With an issue we can track this investigation which may become a new PR or > some clarifications in the IO documentation. > > > We wanted the current timestamp based on some custom time embedded > within the record and not approximate arrival time and not sure how we can > achieve that. > > KinesisIO outputs only byte[] of a message payload without any decoding. > If your timestamps sit in the messages' payload, I think, this approach > should work: > https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements > > Best Regards, > Pavel Solomin > > Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin > <https://www.linkedin.com/in/pavelsolomin> > > > > > > On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> We are implementing EFO Kinesis IO reader provided by apache beam. >> I see that in code that for implementation of getCurrentTimestamp we >> always return getApproximateArrivalTimestamp and not the event time >> which we may have set for that record using withCustomWatermarkPolicy. >> >> Please refer: >> >> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91 >> >> However for KafkaIO we do something different: >> We always get the getCurrentTimestamp based on `timestampPolicy` set for >> Kafka where user can emit a custom timestamp associated with each record. >> >> Please refer: >> >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210 >> >> So why is there a difference in these two implementations? >> >> We wanted the current timestamp based on some custom time embedded within >> the record and not approximate arrival time and not sure how we can achieve >> that. >> >> Please let us know if there is a way out to achieve this for Kinesis. >> >> Thanks >> Sachin >> >>