This is per-partition, right? In that case I assume it will match the current Kafka watermark.
On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <[email protected]> wrote: > Hi Reuven, > > I'm going to use MonotonicallyIncreasing > <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> > by > default and in the future, we may want to support custom kind if there is a > request. > > On Thu, May 28, 2020 at 8:54 PM Reuven Lax <[email protected]> wrote: > >> Which WatermarkEstimator do you think should be used? >> >> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <[email protected]> wrote: >> >>> Hi team, >>> >>> I'm Boyuan, currently working on building a Kafka read PTransform on top >>> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I >>> want to discuss with you: >>> >>> 1. Compared to the KafkaIO.Read >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, >>> the SplittableDoFn Kafka version allows taking TopicPartition and >>> startReadTime as elements and processing them during execution time, >>> instead of configuring topics at pipeline construction time. I'm wondering >>> whether there are other configurations we also want to populate during >>> pipeline execution time instead of construction time. Taking these >>> configurations as elements would make value when they could be different >>> for different TopicPartition. For a list of configurations we have now, >>> please refer to KafkaIO.Read >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351> >>> . >>> >>> 2. I also want to offer a simple way for KafkaIO.Read to expand with the >>> SDF version PTransform. Almost all configurations can be translated easily >>> from KafkaIO.Read to the SDF version read except custom >>> TimestampPolicyFactory (It's easy to translate build-in default types such >>> as withProcessingTime >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, >>> withCreateTime >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> >>> and withLogAppendTime >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). >>> With SplittableDoFn, we have WatermarkEstimator >>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> >>> to track watermark per TopicPartition. Thus, instead of >>> TimestampPolicyFactory >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> >>> , >>> we need the user to provide a function which can extract output timestamp >>> from a KafkaRecord(like withTimestampFn >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). >>> My question here is, are the default types enough for current Kafka.Read >>> users? If the custom TimestampPolicy is really in common? Is it okay to use >>> current API withTimestampFn >>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> >>> in >>> KafkaIO.Read to accept the custom function and populate it to the SDF read >>> transform? >>> >>> Thanks for your help! >>> >>> [1] https://beam.apache.org/blog/splittable-do-fn/ >>> [2] https://s.apache.org/splittable-do-fn >>> [3] My prototype PR https://github.com/apache/beam/pull/11749 >>> >>
