Thanks, appreciated the links.

Ended up using `Read.from` using an UnboundedSource due to some challenges
with the actual splitting part of SDF.

Perhaps it's trivial knowledge, but just sharing here that when testing (in
direct runner) with unbounded sources, one has to test with more elements
than UnboundedReadEvaluatorFactory.ARBITRARY_MAX_ELEMENTS (10). I was
attempting some local tests with 5 elements and not understanding why
getWatermark wasn't called :)






On Mon, Jun 15, 2020 at 12:13 PM Boyuan Zhang <boyu...@google.com> wrote:

> Thanks Pablo!
> Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the
> core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF:
> HBaseIO
> <https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>,
> CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps
> : )
>
> On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada <pabl...@google.com> wrote:
>
>> Hi Pedro,
>> Boyuan shared her prototype implementation in [1]. If you're coding a
>> SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
>> Best
>> -P.
>> [1] https://github.com/apache/beam/pull/11749/files
>>
>> On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira <pedr...@gmail.com>
>> wrote:
>>
>>> Hi Boyuan,
>>>
>>> Is the implementation (even if incomplete) open source / available at
>>> this moment?
>>>
>>> Trying to implement here an IO to a custom source here using
>>> SplittableDoFn, and it would be helpful to see more examples :)
>>>
>>> Thanks,
>>> Pedro
>>>
>>>
>>> On 2020/05/29 02:16:49, Boyuan Zhang <boyu...@google.com> wrote:
>>> > Hi team,
>>> >
>>> > I'm Boyuan, currently working on building a Kafka read PTransform on
>>> top of
>>> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>> want
>>> > to discuss with you:
>>> >
>>> > 1.  Compared to the KafkaIO.Read
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>>> >,
>>> > the SplittableDoFn Kafka version allows taking TopicPartition and
>>> > startReadTime as elements and processing them during execution time,
>>> > instead of configuring topics at pipeline construction time. I'm
>>> wondering
>>> > whether there are other configurations we also want to populate during
>>> > pipeline execution time instead of construction time. Taking these
>>> > configurations as elements would make value when they could be
>>> different
>>> > for different TopicPartition. For a list of configurations we have now,
>>> > please refer to KafkaIO.Read
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>>> >
>>> > .
>>> >
>>> > 2. I also want to offer a simple way for KafkaIO.Read to expand with
>>> the
>>> > SDF version PTransform. Almost all configurations can be translated
>>> easily
>>> > from KafkaIO.Read to the SDF version read except custom
>>> > TimestampPolicyFactory (It's easy to translate build-in default types
>>> such
>>> > as withProcessingTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
>>> >,
>>> > withCreateTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
>>> >
>>> > and withLogAppendTime
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
>>> >.).
>>> > With SplittableDoFn, we have WatermarkEstimator
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
>>> >
>>> > to track watermark per TopicPartition. Thus, instead of
>>> > TimestampPolicyFactory
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
>>> >
>>> > ,
>>> > we need the user to provide a function which can extract output
>>> timestamp
>>> > from a KafkaRecord(like withTimestampFn
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>>> >).
>>> > My question here is, are the default types enough for current
>>> Kafka.Read
>>> > users? If the custom TimestampPolicy is really in common? Is it okay
>>> to use
>>> > current API withTimestampFn
>>> > <
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>>> >
>>> > in
>>> > KafkaIO.Read to accept the custom function and populate it to the SDF
>>> read
>>> > transform?
>>> >
>>> > Thanks for your help!
>>> >
>>> > [1] https://beam.apache.org/blog/splittable-do-fn/
>>> > [2] https://s.apache.org/splittable-do-fn
>>> > [3] My prototype PR https://github.com/apache/beam/pull/11749
>>> >
>>>
>>

Reply via email to