Hi,
I think you could try implementing the `CheckpointedFunction` interface and
`FunctionInitializationContext.isRestored` is an indicator for that.

BTW: I am not very sure your scenarios but maybe you could try to set
idleness configurations [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
Best,
Guowei


On Fri, Mar 5, 2021 at 2:19 AM bat man <tintin0...@gmail.com> wrote:

> Thanks Piotr. Got it. Had to push the static rules to the kafka queue as
> it had expired and got archived from the topic. Post this the pipeline
> resumed.
> To your suggestion on implementing an operator that remembers the
> watermark, is there any indicator that the job has been resumed which I can
> use to emit the watermark in case the job has been resumed from savepoint.
>
> On Thu, Mar 4, 2021 at 8:46 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi Hemant,
>>
>> State of the latest seen watermarks is not persisted in the operators.
>> Currently DataStream API assumes that after recovery watermarks are going
>> to be re-emitted sooner or later. What probably happens is that one of your
>> sources has emitted watermarks (maybe some very high one or even
>> `MAX_WATERMARK`) before taking a savepoint, and then it stopped emitting
>> them. As long as the job is not restarted, this watermark is kept in
>> memory. However after recovery, all watermarks in the operators are set to
>> MIN_WATERMARK (-9223372036854775808), and in your case, probably one of the
>> inputs `KeyedCoProcessFunction` watermark is never updated after the
>> recovery (for multiple input operators/functions combined watermark is min
>> from all of the inputs).
>>
>> You would need to make sure in one way or another that the watermarks are
>> being emitted after the recovery. As a last resort, you could probably
>> implement an operator that remembers the last checkpointed watermark on
>> its state, and re-emits it upon recovery.
>>
>> Best,
>> Piotrek
>>
>> czw., 4 mar 2021 o 15:43 bat man <tintin0...@gmail.com> napisaƂ(a):
>>
>>> Hi All,
>>>
>>> I have a job where my source is kafka. Stream1 is partition the data on
>>> dynamic key, join the data with static rules(source kafka).I use
>>> KeyedCoProcessFunction to join the Steam1 with Stream2(source kafka).
>>> All works fine in a normal run.
>>>
>>> For changing the watermark generation interval I stop the job taking a
>>> savepoint. When I restart the job with the savepoint the watermark is stuck
>>> at - -9223372036854775808.
>>> Because of this the process function doesn't emit any results.
>>>
>>> What could be the problem?
>>>
>>> Thanks,
>>> Hemant
>>>
>>

Reply via email to