psolomin commented on issue #28760: URL: https://github.com/apache/beam/issues/28760#issuecomment-1747717875
Hi @sjmittal thank you for filling in this issue! The refs to KafkaIO are also quite useful too. I've checked other implementations of `UnboundedSource.UnboundedReader` and their `getCurrentTimestamp` methods, namely: - `PubsubReader` - uses either [publish time or custom timestamp set by publisher explicitly](https://github.com/apache/beam/blob/c2666e114971a4727919d9c95287a4ceaf486a92/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java#L273). - `UnboundedRabbitMqReader` - [hard-codes](https://github.com/apache/beam/blob/c2666e114971a4727919d9c95287a4ceaf486a92/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java#L571) to delivery timestamp attribute, uses `Instant.now()` only if that attribute is not present. - `SqsUnboundedReader` - [uses publish timestamp too](https://github.com/apache/beam/blob/6f4e2852311db381a622dec3ebf26654c9372806/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsMessage.java#L36) These implementations make me thinking that `KinesisIO` has its `getCurrentTimestamp` following the commonly used semantics of other readers. In the mailing used you wrote: > We wanted the current timestamp based on some custom time embedded within the record and not approximate arrival time and not sure how we can achieve that. KinesisIO outputs only byte[] of a message payload without any decoding. If your timestamps sit in the messages' payload, I think, this approach should work: https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements Therefore I am not sure `KinesisIO` code should change. Any thoughts @aromanenko-dev @mosche ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
