Hi Pedro,
Boyuan shared her prototype implementation in [1]. If you're coding a
SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
Best
-P.
[1] https://github.com/apache/beam/pull/11749/files

On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pedr...@gmail.com>
wrote:

> Hi Boyuan,
>
> Is the implementation (even if incomplete) open source / available at this
> moment?
>
> Trying to implement here an IO to a custom source here using
> SplittableDoFn, and it would be helpful to see more examples :)
>
> Thanks,
> Pedro
>
>
> On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote:
> > Hi team,
> >
> > I'm Boyuan, currently working on building a Kafka read PTransform on top
> of
> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
> > to discuss with you:
> >
> > 1.  Compared to the KafkaIO.Read
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
> >,
> > the SplittableDoFn Kafka version allows taking TopicPartition and
> > startReadTime as elements and processing them during execution time,
> > instead of configuring topics at pipeline construction time. I'm
> wondering
> > whether there are other configurations we also want to populate during
> > pipeline execution time instead of construction time. Taking these
> > configurations as elements would make value when they could be different
> > for different TopicPartition. For a list of configurations we have now,
> > please refer to KafkaIO.Read
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
> >
> > .
> >
> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the
> > SDF version PTransform. Almost all configurations can be translated
> easily
> > from KafkaIO.Read to the SDF version read except custom
> > TimestampPolicyFactory (It's easy to translate build-in default types
> such
> > as withProcessingTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
> >,
> > withCreateTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
> >
> > and withLogAppendTime
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
> >.).
> > With SplittableDoFn, we have WatermarkEstimator
> > <
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
> >
> > to track watermark per TopicPartition. Thus, instead of
> > TimestampPolicyFactory
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
> >
> > ,
> > we need the user to provide a function which can extract output timestamp
> > from a KafkaRecord(like withTimestampFn
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
> >).
> > My question here is, are the default types enough for current Kafka.Read
> > users? If the custom TimestampPolicy is really in common? Is it okay to
> use
> > current API withTimestampFn
> > <
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
> >
> > in
> > KafkaIO.Read to accept the custom function and populate it to the SDF
> read
> > transform?
> >
> > Thanks for your help!
> >
> > [1] https://beam.apache.org/blog/splittable-do-fn/
> > [2] https://s.apache.org/splittable-do-fn
> > [3] My prototype PR https://github.com/apache/beam/pull/11749
> >
>

Reply via email to