Hi Raghu,
Thanks for the response.

I used withTopics() and withCreateTime() to read records from multiple
topics, "topic1" with message.timestamp.type=CreateTime and "topic2"
with message.timestamp.type=LogAppendTime.
And I got the Exception: java.lang.IllegalArgumentException: Kafka record's
timestamp is not 'CREATE_TIME' (topic: topic2, partition 0, offset 0,
timestamp type 'LogAppendTime').

So, my understanding is that, only when withLogAppendTime() is used, we
don't want users to get stuck if some topics have multiple timestamp types.
In this case, where two topics have two different timestamp types, the
watermark will be calculated only based on the records which belong to the
topic with timestamp as LogAppendTime. I am thinking that we can calculate
the watermark more accurately if we also consider the records in the topic
with timestamp type CreateTime. So, instead of directly returning
currentWatermark, we can update currentWatermark with the record's
timestamp(either CreateTime (or) LogAppendTime) always and then return
currentWatermark.

I would like to contribute if a fix is needed.

Regards,
Rahul

On Sat, Jan 5, 2019 at 12:40 AM Raghu Angadi <rang...@google.com> wrote:

> The intention was to assert on 'timestamp_type' on the first record only.
> I was not entirely sure if there are situations in Kafka where a timestamp
> type could be different or timestamp itself could be missing for some
> records. The assertion on the first record was just to sanity check common
> misconfiguration. The way this policy checked for first record itself is
> incorrect in the case of idle partitions since the watermark advances even
> with out any records read.. this is the issue you encountered. When the
> timestamp type does not match, it's timestamp is not used to watermark.
>
> As you suggested, simpler fix might just be require every record's
> timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else').
> Is that safe? We don't want users to get stuck if some topics are expected
> to have multiple timestamp types.
>
> Raghu.
> On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
>> idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
>> even when log.message.timestamp.type = CreateTime.
>>
>> This happens due to the statement:
>> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
>> getTimestampForRecord() method in TimestampPolicyFactory Interface.
>>
>> As the topic is idle at the beginning of the pipeline, the
>> currentWatermark is advanced (backlog==0), because of which
>> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and
>> the timestamp of the records are taken as currentWatermark.
>>
>> If we change else if() to else, IllegalStateException is thrown when the
>> first record from the Kafka topic is read, which is expected.
>> Is there any specific reason behind using else if() instead of else?
>>
>> Thanks and Regards,
>> Rahul
>>
>>

Reply via email to