The intention was to assert on 'timestamp_type' on the first record only. I was not entirely sure if there are situations in Kafka where a timestamp type could be different or timestamp itself could be missing for some records. The assertion on the first record was just to sanity check common misconfiguration. The way this policy checked for first record itself is incorrect in the case of idle partitions since the watermark advances even with out any records read.. this is the issue you encountered. When the timestamp type does not match, it's timestamp is not used to watermark.
As you suggested, simpler fix might just be require every record's timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else'). Is that safe? We don't want users to get stuck if some topics are expected to have multiple timestamp types. Raghu. On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <[email protected]> wrote: > Hi, > > We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is > idle at the beginning of the pipeline, IllegalStateException is NOT thrown > even when log.message.timestamp.type = CreateTime. > > This happens due to the statement: > else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in > getTimestampForRecord() method in TimestampPolicyFactory Interface. > > As the topic is idle at the beginning of the pipeline, the > currentWatermark is advanced (backlog==0), because of which > currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and > the timestamp of the records are taken as currentWatermark. > > If we change else if() to else, IllegalStateException is thrown when the > first record from the Kafka topic is read, which is expected. > Is there any specific reason behind using else if() instead of else? > > Thanks and Regards, > Rahul > >
