will watermark also get checkpointed by default along with the offset of
the partition?

We have found a limitation for CustomTimestampPolicyWithLimitedDelay. Consider
this scenario:
If we are processing a stream of events from Kafka with event timestamps
older than the current processing time(say 1 day), and If I set maxDelay as
1 day, when the topic is idle(for some time), watermark will advance to the
current time, thereby discarding any data which arrives later in the
pipeline(as the event timestamps are 1 day old) considering them as late.

Also, can we use an instance variable(to calculate idle time in Kafka topic
and advance watermark accordingly, instead of moving the watermark to the
current time) which cannot be checkpointed, for the class which implements
the createTimestampPolicy method in TimestampPolicyFactory interface?

Regards,
Rahul

On Wed, May 22, 2019 at 9:04 AM rahul patwari <rahulpatwari8...@gmail.com>
wrote:

> Hi,
>
> We are using withTimestampPolicyFactory
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
> (TimestampPolicyFactory
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
> <K
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> ,V
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> > timestampPolicyFactory) method in KafkaIO.Read, where we have written a
> lambda for createTimestampPolicy
> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
>  tp,
> java.util.Optional<Instant
> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
> > previousWatermark).
>
> Sample code:
> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
> GenericRecord>read()
>
> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>                 .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
> AvroCoder.of(GenericRecord.class, avroSchema))
>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
> KafkaCustomTimestampPolicy(prevWatermark));
>
> The topic we are reading from only has one partition.
> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is
> being created multiple times.
>
> Is there any documentation describing the guidelines one should follow
> when implementing custom watermark?
> How does checkpointing affect the watermark?
>
> StackTrace from constructor of KafkaCustomTimestampPolicy:
>
>  
> com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41),
>  
> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
> instance is created here)
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536),
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
>
>  
> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
>
>  
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
>
>  
> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
>
>  
> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
>
>  
> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
>  java.util.concurrent.FutureTask.run(FutureTask.java:266),
>
>  
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
>
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
>  java.lang.Thread.run(Thread.java:748)
>

Reply via email to