Hi,

We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
even when log.message.timestamp.type = CreateTime.

This happens due to the statement:
else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
getTimestampForRecord() method in TimestampPolicyFactory Interface.

As the topic is idle at the beginning of the pipeline, the currentWatermark
is advanced (backlog==0), because of which
currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and the
timestamp of the records are taken as currentWatermark.

If we change else if() to else, IllegalStateException is thrown when the
first record from the Kafka topic is read, which is expected.
Is there any specific reason behind using else if() instead of else?

Thanks and Regards,
Rahul

Reply via email to