The watermark should be checkpointed along with partition offsets. You will have one watermark class instance for each bundle that is processing. You will have one bundle processed per checkpoint and also one bundle per split (so that Kafka can be read in parallel by multiple workers) and also one bundle representing the initial work that needs to be done. On Spark, this is likely all checkpointing.
On Wed, May 22, 2019 at 8:29 PM rahul patwari <[email protected]> wrote: > Hi Lukasz, > > There was a bug in my code. When the topic is idle, I indeed get watermark > as (now - maxDelay). > > I have a few questions: > I have created a static int variable in my watermark class and incremented > the variable inside the constructor. I ran the pipeline in SparkRunner for > approximately 3 minutes, and in the logs, I see that the counter value is > around 800, which implies that my watermark class instance is created 800 > times. > > previousWatermark javadoc[1] suggests that it is the latest checkpointed > watermark. Does this mean watermark is checkpointed along with partition > offsets? As the counter value is around 800, does this mean watermark is > checkpointed 800 times? Does this mean my watermark class instance will be > created after every new checkpoint? > > 1: > https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L49 > > > Thanks, > Rahul > > On Thu, May 23, 2019 at 12:23 AM Lukasz Cwik <[email protected]> wrote: > >> >> >> On Wed, May 22, 2019 at 11:17 AM rahul patwari < >> [email protected]> wrote: >> >>> will watermark also get checkpointed by default along with the offset of >>> the partition? >>> >>> We have found a limitation for CustomTimestampPolicyWithLimitedDelay. >>> Consider >>> this scenario: >>> If we are processing a stream of events from Kafka with event timestamps >>> older than the current processing time(say 1 day), and If I set maxDelay as >>> 1 day, when the topic is idle(for some time), watermark will advance to the >>> current time, thereby discarding any data which arrives later in the >>> pipeline(as the event timestamps are 1 day old) considering them as late. >>> >> >> This seems like a bug since the watermark should only advance to >> currentTime - maxDelay if the topic is empty based upon the >> CustomTimestampPolicyWithLimitedDelay javadoc[1]. >> >> Also, can we use an instance variable(to calculate idle time in Kafka >>> topic and advance watermark accordingly, instead of moving the watermark to >>> the current time) which cannot be checkpointed, for the class which >>> implements the createTimestampPolicy method in TimestampPolicyFactory >>> interface? >>> >>> Regards, >>> Rahul >>> >>> On Wed, May 22, 2019 at 9:04 AM rahul patwari < >>> [email protected]> wrote: >>> >>>> Hi, >>>> >>>> We are using withTimestampPolicyFactory >>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-> >>>> (TimestampPolicyFactory >>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html> >>>> <K >>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> >>>> ,V >>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> >>>> > timestampPolicyFactory) method in KafkaIO.Read, where we have >>>> written a lambda for createTimestampPolicy >>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition >>>> tp, >>>> java.util.Optional<Instant >>>> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true> >>>> > previousWatermark). >>>> >>>> Sample code: >>>> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, >>>> GenericRecord>read() >>>> >>>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) >>>> .withKeyDeserializer(StringDeserializer.class) >>>> >>>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class, >>>> AvroCoder.of(GenericRecord.class, avroSchema)) >>>> .withTimestampPolicyFactory((tp, prevWatermark) -> new >>>> KafkaCustomTimestampPolicy(prevWatermark)); >>>> >>>> The topic we are reading from only has one partition. >>>> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance >>>> is being created multiple times. >>>> >>>> Is there any documentation describing the guidelines one should follow >>>> when implementing custom watermark? >>>> >>> >> Javadoc for getWatermark on unbounded sources[2]. >> >> >>> How does checkpointing affect the watermark? >>>> >>> >> The watermark doesn't advance while a source is checkpointed and not >> being actively processed. The watermark only advances based upon the values >> that the source provides. >> >> >>> >>>> >>>> StackTrace from constructor of KafkaCustomTimestampPolicy: >>>> >>>> >>>> com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41), >>>> >>>> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read >>>> instance is created here) >>>> >>>> >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536), >>>> >>>> >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126), >>>> >>>> >>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43), >>>> >>>> >>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226), >>>> >>>> >>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132), >>>> >>>> >>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), >>>> >>>> >>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124), >>>> >>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), >>>> java.util.concurrent.FutureTask.run(FutureTask.java:266), >>>> >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149), >>>> >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624), >>>> java.lang.Thread.run(Thread.java:748) >>>> >>> >> 1: >> https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L33 >> >> 2: >> https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L228 >> >
