The intention was to assert on 'timestamp_type' on the first record only. I
was not entirely sure if there are situations in Kafka where a timestamp
type could be different or timestamp itself could be missing for some
records. The assertion on the first record was just to sanity check common
misconfiguration. The way this policy checked for first record itself is
incorrect in the case of idle partitions since the watermark advances even
with out any records read.. this is the issue you encountered. When the
timestamp type does not match, it's timestamp is not used to watermark.

As you suggested, simpler fix might just be require every record's
timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else').
Is that safe? We don't want users to get stuck if some topics are expected
to have multiple timestamp types.

Raghu.
On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <[email protected]>
wrote:

> Hi,
>
> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
> idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
> even when log.message.timestamp.type = CreateTime.
>
> This happens due to the statement:
> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
> getTimestampForRecord() method in TimestampPolicyFactory Interface.
>
> As the topic is idle at the beginning of the pipeline, the
> currentWatermark is advanced (backlog==0), because of which
> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and
> the timestamp of the records are taken as currentWatermark.
>
> If we change else if() to else, IllegalStateException is thrown when the
> first record from the Kafka topic is read, which is expected.
> Is there any specific reason behind using else if() instead of else?
>
> Thanks and Regards,
> Rahul
>
>

Reply via email to