There are really two cases that make sense:

(1) We read the event timestamps from the kafka records themselves and
have some external knowledge that guarantees (or at least provides a
very good heuristic) about what the timestamps of unread messages
could be in the future to set the watermark. This could possibly
involve knowing that the timestamps in a partition are monotonically
increasing, or somehow have bounded skew.

(2) We use processing time as both the watermark and for setting the
event timestamp on produced messages. From this point on we can safely
reason about the event time.

The current state seems a bit broken if I understand correctly.

On Tue, Oct 31, 2023 at 1:16 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> I think that instead of deprecating and creating new version, we could 
> leverage the proposed update compatibility flag for this [1]. I still have 
> some doubts if the processing-time watermarking (and event-time assignment) 
> makes sense. Do we have a valid use-case for that? This is actually the 
> removed SYNCHRONIZED_PROCESSING_TIME time domain, which is problematic - 
> restarts of Pipelines causes timestamps to change and hence makes *every* 
> DoFn potentially non-deterministic, which would be unexpected side-effect. 
> This makes me wonder if we should remove this policy altogether (deprecate or 
> use the update compatibility flag, so that the policy throws exception in new 
> version).
>
> The crucial point would be to find a use-case where it is actually helpful to 
> use such policy.
> Any ideas?
>
>  Jan
>
> [1] https://lists.apache.org/thread/29r3zv04n4ooq68zzvpw6zm1185n59m2
>
> On 10/27/23 18:33, Alexey Romanenko wrote:
>
> Ahh, ok, I see.
>
> Yes, it looks like a bug. So, I'd propose to deprecate the old "processing 
> time” watermark policy, which we can remove later, and create a new fixed one.
>
> PS: It’s recommended to use "org.apache.beam.sdk.io.aws2.kinesis.KinesisIO” 
> instead of deprecated “org.apache.beam.sdk.io.kinesis.KinesisIO” one.
>
> —
> Alexey
>
> On 27 Oct 2023, at 17:42, Jan Lukavský <je...@seznam.cz> wrote:
>
> No, I'm referring to this [1] policy which has unexpected (and hardly 
> avoidable on the user-code side) data loss issues. The problem is that 
> assigning timestamps to elements and watermarks is completely decoupled and 
> unrelated, which I'd say is a bug.
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kinesis/KinesisIO.Read.html#withProcessingTimeWatermarkPolicy--
>
> On 10/27/23 16:51, Alexey Romanenko wrote:
>
> Why not just to create a custom watermark policy for that? Or you mean to 
> make it as a default policy?
>
> —
> Alexey
>
> On 27 Oct 2023, at 10:25, Jan Lukavský <je...@seznam.cz> wrote:
>
>
> Hi,
>
> when discussing about [1] we found out, that the issue is actually caused by 
> processing time watermarks in KinesisIO. Enabling this watermark outputs 
> watermarks based on current processing time, _but event timestamps are 
> derived from ingestion timestamp_. This can cause unbounded lateness when 
> processing backlog. I think this setup is error-prone and will likely cause 
> data loss due to dropped elements. This can be solved in two ways:
>
>  a) deprecate processing time watermarks, or
>
>  b) modify KinesisIO's watermark policy so that is assigns event timestamps 
> as well (the processing-time watermark policy would have to derive event 
> timestamps from processing-time).
>
> I'd prefer option b) , but it might be a breaking change, moreover I'm not 
> sure if I understand the purpose of processing-time watermark policy, it 
> might be essentially ill defined from the beginning, thus it might really be 
> better to remove it completely. There is also a related issue [2].
>
> Any thoughts on this?
>
>  Jan
>
> [1] https://github.com/apache/beam/issues/25975
>
> [2] https://github.com/apache/beam/issues/28760
>
>
>

Reply via email to