Hi team,

I'm Boyuan, currently working on building a Kafka read PTransform on top of
SplittableDoFn[1][2][3]. There are two questions about Kafka usage I want
to discuss with you:

1.  Compared to the KafkaIO.Read
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
the SplittableDoFn Kafka version allows taking TopicPartition and
startReadTime as elements and processing them during execution time,
instead of configuring topics at pipeline construction time. I'm wondering
whether there are other configurations we also want to populate during
pipeline execution time instead of construction time. Taking these
configurations as elements would make value when they could be different
for different TopicPartition. For a list of configurations we have now,
please refer to KafkaIO.Read
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
.

2. I also want to offer a simple way for KafkaIO.Read to expand with the
SDF version PTransform. Almost all configurations can be translated easily
from KafkaIO.Read to the SDF version read except custom
TimestampPolicyFactory (It's easy to translate build-in default types such
as withProcessingTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
withCreateTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
and withLogAppendTime
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
With SplittableDoFn, we have WatermarkEstimator
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
to track watermark per TopicPartition. Thus, instead of
TimestampPolicyFactory
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
,
we need the user to provide a function which can extract output timestamp
from a KafkaRecord(like withTimestampFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
My question here is, are the default types enough for current Kafka.Read
users? If the custom TimestampPolicy is really in common? Is it okay to use
current API withTimestampFn
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
in
KafkaIO.Read to accept the custom function and populate it to the SDF read
transform?

Thanks for your help!

[1] https://beam.apache.org/blog/splittable-do-fn/
[2] https://s.apache.org/splittable-do-fn
[3] My prototype PR https://github.com/apache/beam/pull/11749

Reply via email to