Hi Juan,

I just replied to your other question, but I think, I better get where you
are coming from now.

Are you aware of per-partition watermarking [1]? You don't need to manage
this map yourself. BUT: this does not solve the problem, that this Map is
not stored in Managed State. Watermarks are generally not part of Flink's
State. It seems like this is what you are looking for?

To also answer your questions: You could go for List<Entry<Partition,
Watermark>> state with union redistribution. In this case every operator
will get all entries during recovery and you can filter out the ones, which
are relevant to the current operator by checking which partitions it is
subscribed to after recovery.

Hope this helps.

Cheers,

Konstantin


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

On Wed, Jul 3, 2019 at 6:04 PM Juan Gentile <j.gent...@criteo.com> wrote:

> Hello!
>
>
>
> We currently have a job which reads from Kafka and uses punctuated
> watermarks based on the messages we read. We currently keep track of the
> watermarks for each partition to emit a consensus watermark, taking the
> smallest of all partitions.
>
> We ran into an issue because we are not storing the state of this map of
> partitions->watermarks when one of the partitions got delayed and the job
> restarted, losing track of that partition and emitting a watermark anyway.
>
> Our idea of a solution involves saving this map of partition -> watermarks
> into the state but we would like to know how Flink behaves when we decrease
> the parallelism so as to make sure that the instance that will read from
> Kafka also will have the state for that particular partition.
>
>
>
> To give an example:
>
>
>
> Operator 1: (Reads Partition1)
>
> Partition 1: Watermark1 (Map / State)
>
>
>
> Operator 2: (Reads Partition2)
>
> Partition 2: Watermark2 (Map / State)
>
>
>
> Operator 3: (Reads Partition1)
>
> Partition 3: Watermark3 (Map / State)
>
>
>
>
>
> After shrinking:
>
>
>
> Operator 1: (Reads Partition1)
>
> Partition 1: Watermark1 (Map / State)
>
>
>
> Operator 2: (Reads Partition2, Partition3)
>
> Partition 2: Watermark2 (Map / State)
>
> Partition 3: Watermark3 (Map / State)
>
>
>
> Or
>
>
>
> Operator 1: (Reads Partition1, Partition3) => HERE we would have a
> problem as the state could be loaded on the other operator.
>
> Partition 1: Watermark1 (Map / State)
>
>
>
> Operator 2: (Reads Partition2)
>
> Partition 2: Watermark2 (Map / State)
>
> Partition 3: Watermark3 (Map / State)
>
>
>
> For this we are using the operator state (
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-operator-state)
> with “*Even-split redistribution*”
>
>
>
> Could you please give us a hand understanding how Flink behaves in such
> scenario?
>
>
>
> Thank you,
>
> Juan G.
>


-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010


--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Reply via email to