Hi, We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is idle at the beginning of the pipeline, IllegalStateException is NOT thrown even when log.message.timestamp.type = CreateTime.
This happens due to the statement: else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in getTimestampForRecord() method in TimestampPolicyFactory Interface. As the topic is idle at the beginning of the pipeline, the currentWatermark is advanced (backlog==0), because of which currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and the timestamp of the records are taken as currentWatermark. If we change else if() to else, IllegalStateException is thrown when the first record from the Kafka topic is read, which is expected. Is there any specific reason behind using else if() instead of else? Thanks and Regards, Rahul
