Hi Reuven, I'm going to use MonotonicallyIncreasing <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105> by default and in the future, we may want to support custom kind if there is a request.
On Thu, May 28, 2020 at 8:54 PM Reuven Lax <re...@google.com> wrote: > Which WatermarkEstimator do you think should be used? > > On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <boyu...@google.com> wrote: > >> Hi team, >> >> I'm Boyuan, currently working on building a Kafka read PTransform on top >> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I >> want to discuss with you: >> >> 1. Compared to the KafkaIO.Read >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>, >> the SplittableDoFn Kafka version allows taking TopicPartition and >> startReadTime as elements and processing them during execution time, >> instead of configuring topics at pipeline construction time. I'm wondering >> whether there are other configurations we also want to populate during >> pipeline execution time instead of construction time. Taking these >> configurations as elements would make value when they could be different >> for different TopicPartition. For a list of configurations we have now, >> please refer to KafkaIO.Read >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351> >> . >> >> 2. I also want to offer a simple way for KafkaIO.Read to expand with the >> SDF version PTransform. Almost all configurations can be translated easily >> from KafkaIO.Read to the SDF version read except custom >> TimestampPolicyFactory (It's easy to translate build-in default types such >> as withProcessingTime >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>, >> withCreateTime >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726> >> and withLogAppendTime >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.). >> With SplittableDoFn, we have WatermarkEstimator >> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java> >> to track watermark per TopicPartition. Thus, instead of >> TimestampPolicyFactory >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java> >> , >> we need the user to provide a function which can extract output timestamp >> from a KafkaRecord(like withTimestampFn >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>). >> My question here is, are the default types enough for current Kafka.Read >> users? If the custom TimestampPolicy is really in common? Is it okay to use >> current API withTimestampFn >> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780> >> in >> KafkaIO.Read to accept the custom function and populate it to the SDF read >> transform? >> >> Thanks for your help! >> >> [1] https://beam.apache.org/blog/splittable-do-fn/ >> [2] https://s.apache.org/splittable-do-fn >> [3] My prototype PR https://github.com/apache/beam/pull/11749 >> >