Hi Boyuan,

Is the implementation (even if incomplete) open source / available at this 
moment?

Trying to implement here an IO to a custom source here using SplittableDoFn, 
and it would be helpful to see more examples :)

Thanks,
Pedro


On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote: 
> Hi team,
> 
> I'm Boyuan, currently working on building a Kafka read PTransform on top of
> SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
> to discuss with you:
> 
> 1.  Compared to the KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
> the SplittableDoFn Kafka version allows taking TopicPartition and
> startReadTime as elements and processing them during execution time,
> instead of configuring topics at pipeline construction time. I'm wondering
> whether there are other configurations we also want to populate during
> pipeline execution time instead of construction time. Taking these
> configurations as elements would make value when they could be different
> for different TopicPartition. For a list of configurations we have now,
> please refer to KafkaIO.Read
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
> .
> 
> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
> SDF version PTransform. Almost all configurations can be translated easily
> from KafkaIO.Read to the SDF version read except custom
> TimestampPolicyFactory (It's easy to translate build-in default types such
> as withProcessingTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
> withCreateTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
> and withLogAppendTime
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
> With SplittableDoFn, we have WatermarkEstimator
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
> to track watermark per TopicPartition. Thus, instead of
> TimestampPolicyFactory
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
> ,
> we need the user to provide a function which can extract output timestamp
> from a KafkaRecord(like withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
> My question here is, are the default types enough for current Kafka.Read
> users? If the custom TimestampPolicy is really in common? Is it okay to use
> current API withTimestampFn
> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
> in
> KafkaIO.Read to accept the custom function and populate it to the SDF read
> transform?
> 
> Thanks for your help!
> 
> [1] https://beam.apache.org/blog/splittable-do-fn/
> [2] https://s.apache.org/splittable-do-fn
> [3] My prototype PR https://github.com/apache/beam/pull/11749
> 

Reply via email to