Thanks Pablo!
Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the
core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF: HBaseIO
<https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>,
CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps : )

On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada <pabl...@google.com> wrote:

> Hi Pedro,
> Boyuan shared her prototype implementation in [1]. If you're coding a
> SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
> Best
> -P.
> [1] https://github.com/apache/beam/pull/11749/files
>
> On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pedr...@gmail.com>
> wrote:
>
>> Hi Boyuan,
>>
>> Is the implementation (even if incomplete) open source / available at
>> this moment?
>>
>> Trying to implement here an IO to a custom source here using
>> SplittableDoFn, and it would be helpful to see more examples :)
>>
>> Thanks,
>> Pedro
>>
>>
>> On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote:
>> > Hi team,
>> >
>> > I'm Boyuan, currently working on building a Kafka read PTransform on
>> top of
>> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>> want
>> > to discuss with you:
>> >
>> > 1.  Compared to the KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >,
>> > the SplittableDoFn Kafka version allows taking TopicPartition and
>> > startReadTime as elements and processing them during execution time,
>> > instead of configuring topics at pipeline construction time. I'm
>> wondering
>> > whether there are other configurations we also want to populate during
>> > pipeline execution time instead of construction time. Taking these
>> > configurations as elements would make value when they could be different
>> > for different TopicPartition. For a list of configurations we have now,
>> > please refer to KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >
>> > .
>> >
>> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>> > SDF version PTransform. Almost all configurations can be translated
>> easily
>> > from KafkaIO.Read to the SDF version read except custom
>> > TimestampPolicyFactory (It's easy to translate build-in default types
>> such
>> > as withProcessingTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
>> >,
>> > withCreateTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
>> >
>> > and withLogAppendTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
>> >.).
>> > With SplittableDoFn, we have WatermarkEstimator
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
>> >
>> > to track watermark per TopicPartition. Thus, instead of
>> > TimestampPolicyFactory
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
>> >
>> > ,
>> > we need the user to provide a function which can extract output
>> timestamp
>> > from a KafkaRecord(like withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >).
>> > My question here is, are the default types enough for current Kafka.Read
>> > users? If the custom TimestampPolicy is really in common? Is it okay to
>> use
>> > current API withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >
>> > in
>> > KafkaIO.Read to accept the custom function and populate it to the SDF
>> read
>> > transform?
>> >
>> > Thanks for your help!
>> >
>> > [1] https://beam.apache.org/blog/splittable-do-fn/
>> > [2] https://s.apache.org/splittable-do-fn
>> > [3] My prototype PR https://github.com/apache/beam/pull/11749
>> >
>>
>

Reply via email to