Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3915 Hi @zjureel, I went ahead to address my own review / concerns with the change in another PR that is based on your current work: #5282. I hope that is okay, and would be great if you would like to review that. The main changes are: - We eagerly determine the timestamp offsets. `LATEST`, `EARLIEST`, `GROUP_OFFSETS` startup modes still determines the offsets lazily, while `TIMESTAMP` and `SPECIFIC_OFFSETS` will have actual offsets already before they handled by the `KafkaConsumerThread`. It dawned on me that actually there is no reason to lazily determine the offset for timestamp-based startup, since the actual offset in the end in this case does not vary depending on when we fetch the startup offsets. - Don't use `Date` to define timestamp, just use Longs. The Kafka APIs actually take long value timestamps, so I figured it would make sense that we follow.
---