Hi Pedro, Boyuan shared her prototype implementation in [1]. If you're coding a SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java Best -P. [1] https://github.com/apache/beam/pull/11749/files
On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pedr...@gmail.com> wrote: > Hi Boyuan, > > Is the implementation (even if incomplete) open source / available at this > moment? > > Trying to implement here an IO to a custom source here using > SplittableDoFn, and it would be helpful to see more examples :) > > Thanks, > Pedro > > > On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote: > > Hi team, > > > > I'm Boyuan, currently working on building a Kafka read PTransform on top > of > > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want > > to discuss with you: > > > > 1. Compared to the KafkaIO.Read > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351 > >, > > the SplittableDoFn Kafka version allows taking TopicPartition and > > startReadTime as elements and processing them during execution time, > > instead of configuring topics at pipeline construction time. I'm > wondering > > whether there are other configurations we also want to populate during > > pipeline execution time instead of construction time. Taking these > > configurations as elements would make value when they could be different > > for different TopicPartition. For a list of configurations we have now, > > please refer to KafkaIO.Read > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351 > > > > . > > > > 2. I also want to offer a simple way for KafkaIO.Read to expand with the > > SDF version PTransform. Almost all configurations can be translated > easily > > from KafkaIO.Read to the SDF version read except custom > > TimestampPolicyFactory (It's easy to translate build-in default types > such > > as withProcessingTime > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710 > >, > > withCreateTime > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726 > > > > and withLogAppendTime > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699 > >.). > > With SplittableDoFn, we have WatermarkEstimator > > < > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java > > > > to track watermark per TopicPartition. Thus, instead of > > TimestampPolicyFactory > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java > > > > , > > we need the user to provide a function which can extract output timestamp > > from a KafkaRecord(like withTimestampFn > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780 > >). > > My question here is, are the default types enough for current Kafka.Read > > users? If the custom TimestampPolicy is really in common? Is it okay to > use > > current API withTimestampFn > > < > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780 > > > > in > > KafkaIO.Read to accept the custom function and populate it to the SDF > read > > transform? > > > > Thanks for your help! > > > > [1] https://beam.apache.org/blog/splittable-do-fn/ > > [2] https://s.apache.org/splittable-do-fn > > [3] My prototype PR https://github.com/apache/beam/pull/11749 > > >