Hello, sorry for the late reply. EFOKinesisReader implemented the same logic of timestamps non-EFO KinesisReader had. At the time of EFO implementation more careful evaluation of the records' timestamps was out of context.
Can you please create an issue at https://github.com/apache/beam/issues ? With an issue we can track this investigation which may become a new PR or some clarifications in the IO documentation. > We wanted the current timestamp based on some custom time embedded within the record and not approximate arrival time and not sure how we can achieve that. KinesisIO outputs only byte[] of a message payload without any decoding. If your timestamps sit in the messages' payload, I think, this approach should work: https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > We are implementing EFO Kinesis IO reader provided by apache beam. > I see that in code that for implementation of getCurrentTimestamp we > always return getApproximateArrivalTimestamp and not the event time which > we may have set for that record using withCustomWatermarkPolicy. > > Please refer: > > https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91 > > However for KafkaIO we do something different: > We always get the getCurrentTimestamp based on `timestampPolicy` set for > Kafka where user can emit a custom timestamp associated with each record. > > Please refer: > > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210 > > So why is there a difference in these two implementations? > > We wanted the current timestamp based on some custom time embedded within > the record and not approximate arrival time and not sure how we can achieve > that. > > Please let us know if there is a way out to achieve this for Kinesis. > > Thanks > Sachin > >