Thanks Pablo! Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF: HBaseIO <https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>, CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps : )
On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada <pabl...@google.com> wrote: > Hi Pedro, > Boyuan shared her prototype implementation in [1]. If you're coding a > SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java > Best > -P. > [1] https://github.com/apache/beam/pull/11749/files > > On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pedr...@gmail.com> > wrote: > >> Hi Boyuan, >> >> Is the implementation (even if incomplete) open source / available at >> this moment? >> >> Trying to implement here an IO to a custom source here using >> SplittableDoFn, and it would be helpful to see more examples :) >> >> Thanks, >> Pedro >> >> >> On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote: >> > Hi team, >> > >> > I'm Boyuan, currently working on building a Kafka read PTransform on >> top of >> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I >> want >> > to discuss with you: >> > >> > 1. Compared to the KafkaIO.Read >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351 >> >, >> > the SplittableDoFn Kafka version allows taking TopicPartition and >> > startReadTime as elements and processing them during execution time, >> > instead of configuring topics at pipeline construction time. I'm >> wondering >> > whether there are other configurations we also want to populate during >> > pipeline execution time instead of construction time. Taking these >> > configurations as elements would make value when they could be different >> > for different TopicPartition. For a list of configurations we have now, >> > please refer to KafkaIO.Read >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351 >> > >> > . >> > >> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the >> > SDF version PTransform. Almost all configurations can be translated >> easily >> > from KafkaIO.Read to the SDF version read except custom >> > TimestampPolicyFactory (It's easy to translate build-in default types >> such >> > as withProcessingTime >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710 >> >, >> > withCreateTime >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726 >> > >> > and withLogAppendTime >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699 >> >.). >> > With SplittableDoFn, we have WatermarkEstimator >> > < >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java >> > >> > to track watermark per TopicPartition. Thus, instead of >> > TimestampPolicyFactory >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java >> > >> > , >> > we need the user to provide a function which can extract output >> timestamp >> > from a KafkaRecord(like withTimestampFn >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780 >> >). >> > My question here is, are the default types enough for current Kafka.Read >> > users? If the custom TimestampPolicy is really in common? Is it okay to >> use >> > current API withTimestampFn >> > < >> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780 >> > >> > in >> > KafkaIO.Read to accept the custom function and populate it to the SDF >> read >> > transform? >> > >> > Thanks for your help! >> > >> > [1] https://beam.apache.org/blog/splittable-do-fn/ >> > [2] https://s.apache.org/splittable-do-fn >> > [3] My prototype PR https://github.com/apache/beam/pull/11749 >> > >> >