Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.

Thanks
Sachin


On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin <p.o.solo...@gmail.com> wrote:

> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>

Reply via email to