On Wed, May 22, 2019 at 11:17 AM rahul patwari <[email protected]>
wrote:

> will watermark also get checkpointed by default along with the offset of
> the partition?
>
> We have found a limitation for CustomTimestampPolicyWithLimitedDelay. Consider
> this scenario:
> If we are processing a stream of events from Kafka with event timestamps
> older than the current processing time(say 1 day), and If I set maxDelay as
> 1 day, when the topic is idle(for some time), watermark will advance to the
> current time, thereby discarding any data which arrives later in the
> pipeline(as the event timestamps are 1 day old) considering them as late.
>

This seems like a bug since the watermark should only advance to
currentTime - maxDelay if the topic is empty based upon the
CustomTimestampPolicyWithLimitedDelay javadoc[1].

Also, can we use an instance variable(to calculate idle time in Kafka topic
> and advance watermark accordingly, instead of moving the watermark to the
> current time) which cannot be checkpointed, for the class which implements
> the createTimestampPolicy method in TimestampPolicyFactory interface?
>
> Regards,
> Rahul
>
> On Wed, May 22, 2019 at 9:04 AM rahul patwari <[email protected]>
> wrote:
>
>> Hi,
>>
>> We are using withTimestampPolicyFactory
>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
>> (TimestampPolicyFactory
>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
>> <K
>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>> ,V
>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>> > timestampPolicyFactory) method in KafkaIO.Read, where we have written
>> a lambda for createTimestampPolicy
>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
>>  tp,
>> java.util.Optional<Instant
>> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
>> > previousWatermark).
>>
>> Sample code:
>> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
>> GenericRecord>read()
>>
>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>>                 .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>> AvroCoder.of(GenericRecord.class, avroSchema))
>>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
>> KafkaCustomTimestampPolicy(prevWatermark));
>>
>> The topic we are reading from only has one partition.
>> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is
>> being created multiple times.
>>
>> Is there any documentation describing the guidelines one should follow
>> when implementing custom watermark?
>>
>
Javadoc for getWatermark on unbounded sources[2].


> How does checkpointing affect the watermark?
>>
>
The watermark doesn't advance while a source is checkpointed and not being
actively processed. The watermark only advances based upon the values that
the source provides.


>
>>
>> StackTrace from constructor of KafkaCustomTimestampPolicy:
>>
>>  
>> com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41),
>>  
>> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
>> instance is created here)
>>
>>  
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536),
>>
>>  
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
>>
>>  
>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
>>
>>  
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
>>
>>  
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
>>
>>  
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
>>
>>  
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
>>  java.util.concurrent.FutureTask.run(FutureTask.java:266),
>>
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
>>
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
>>  java.lang.Thread.run(Thread.java:748)
>>
>
1:
https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L33

2:
https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L228

Reply via email to