On Wed, May 22, 2019 at 11:17 AM rahul patwari <[email protected]> wrote:
> will watermark also get checkpointed by default along with the offset of > the partition? > > We have found a limitation for CustomTimestampPolicyWithLimitedDelay. Consider > this scenario: > If we are processing a stream of events from Kafka with event timestamps > older than the current processing time(say 1 day), and If I set maxDelay as > 1 day, when the topic is idle(for some time), watermark will advance to the > current time, thereby discarding any data which arrives later in the > pipeline(as the event timestamps are 1 day old) considering them as late. > This seems like a bug since the watermark should only advance to currentTime - maxDelay if the topic is empty based upon the CustomTimestampPolicyWithLimitedDelay javadoc[1]. Also, can we use an instance variable(to calculate idle time in Kafka topic > and advance watermark accordingly, instead of moving the watermark to the > current time) which cannot be checkpointed, for the class which implements > the createTimestampPolicy method in TimestampPolicyFactory interface? > > Regards, > Rahul > > On Wed, May 22, 2019 at 9:04 AM rahul patwari <[email protected]> > wrote: > >> Hi, >> >> We are using withTimestampPolicyFactory >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-> >> (TimestampPolicyFactory >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html> >> <K >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> >> ,V >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> >> > timestampPolicyFactory) method in KafkaIO.Read, where we have written >> a lambda for createTimestampPolicy >> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition >> tp, >> java.util.Optional<Instant >> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true> >> > previousWatermark). >> >> Sample code: >> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, >> GenericRecord>read() >> >> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) >> .withKeyDeserializer(StringDeserializer.class) >> >> .withValueDeserializerAndCoder(GenericAvroDeserializer.class, >> AvroCoder.of(GenericRecord.class, avroSchema)) >> .withTimestampPolicyFactory((tp, prevWatermark) -> new >> KafkaCustomTimestampPolicy(prevWatermark)); >> >> The topic we are reading from only has one partition. >> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is >> being created multiple times. >> >> Is there any documentation describing the guidelines one should follow >> when implementing custom watermark? >> > Javadoc for getWatermark on unbounded sources[2]. > How does checkpointing affect the watermark? >> > The watermark doesn't advance while a source is checkpointed and not being actively processed. The watermark only advances based upon the values that the source provides. > >> >> StackTrace from constructor of KafkaCustomTimestampPolicy: >> >> >> com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41), >> >> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read >> instance is created here) >> >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536), >> >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126), >> >> >> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43), >> >> >> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226), >> >> >> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132), >> >> >> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), >> >> >> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124), >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), >> java.util.concurrent.FutureTask.run(FutureTask.java:266), >> >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149), >> >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624), >> java.lang.Thread.run(Thread.java:748) >> > 1: https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L33 2: https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L228
