Hello, sorry for the late reply.

EFOKinesisReader implemented the same logic of timestamps non-EFO
KinesisReader had. At the time of EFO implementation more careful
evaluation of the records' timestamps was out of context.

Can you please create an issue at https://github.com/apache/beam/issues ?
With an issue we can track this investigation which may become a new PR or
some clarifications in the IO documentation.

> We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.

KinesisIO outputs only byte[] of a message payload without any decoding. If
your timestamps sit in the messages' payload, I think, this approach should
work:
https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> We are implementing EFO Kinesis IO reader provided by apache beam.
> I see that in code that for implementation of getCurrentTimestamp we
> always return getApproximateArrivalTimestamp and not the event time which
> we may have set for that record using withCustomWatermarkPolicy.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>
> However for KafkaIO we do something different:
> We always get the getCurrentTimestamp based on `timestampPolicy` set for
> Kafka where user can emit a custom timestamp associated with each record.
>
> Please refer:
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>
> So why is there a difference in these two implementations?
>
> We wanted the current timestamp based on some custom time embedded within
> the record and not approximate arrival time and not sure how we can achieve
> that.
>
> Please let us know if there is a way out to achieve this for Kinesis.
>
> Thanks
> Sachin
>
>

Reply via email to