This is per-partition, right? In that case I assume it will match the
current Kafka watermark.

On Thu, May 28, 2020 at 9:03 PM Boyuan Zhang <[email protected]> wrote:

> Hi Reuven,
>
> I'm going to use MonotonicallyIncreasing
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimators.java#L105>
>  by
> default and in the future, we may want to support custom kind if there is a
> request.
>
> On Thu, May 28, 2020 at 8:54 PM Reuven Lax <[email protected]> wrote:
>
>> Which WatermarkEstimator do you think should be used?
>>
>> On Thu, May 28, 2020 at 7:17 PM Boyuan Zhang <[email protected]> wrote:
>>
>>> Hi team,
>>>
>>> I'm Boyuan, currently working on building a Kafka read PTransform on top
>>> of SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>>> want to discuss with you:
>>>
>>> 1.  Compared to the KafkaIO.Read
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>,
>>> the SplittableDoFn Kafka version allows taking TopicPartition and
>>> startReadTime as elements and processing them during execution time,
>>> instead of configuring topics at pipeline construction time. I'm wondering
>>> whether there are other configurations we also want to populate during
>>> pipeline execution time instead of construction time. Taking these
>>> configurations as elements would make value when they could be different
>>> for different TopicPartition. For a list of configurations we have now,
>>> please refer to KafkaIO.Read
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351>
>>> .
>>>
>>> 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>>> SDF version PTransform. Almost all configurations can be translated easily
>>> from KafkaIO.Read to the SDF version read except custom
>>> TimestampPolicyFactory (It's easy to translate build-in default types such
>>> as withProcessingTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710>,
>>> withCreateTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726>
>>> and withLogAppendTime
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699>.).
>>> With SplittableDoFn, we have WatermarkEstimator
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java>
>>> to track watermark per TopicPartition. Thus, instead of
>>> TimestampPolicyFactory
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java>
>>>  ,
>>> we need the user to provide a function which can extract output timestamp
>>> from a KafkaRecord(like withTimestampFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>).
>>> My question here is, are the default types enough for current Kafka.Read
>>> users? If the custom TimestampPolicy is really in common? Is it okay to use
>>> current API withTimestampFn
>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780>
>>>  in
>>> KafkaIO.Read to accept the custom function and populate it to the SDF read
>>> transform?
>>>
>>> Thanks for your help!
>>>
>>> [1] https://beam.apache.org/blog/splittable-do-fn/
>>> [2] https://s.apache.org/splittable-do-fn
>>> [3] My prototype PR https://github.com/apache/beam/pull/11749
>>>
>>

Reply via email to