Hi Chengzhi,

You can access the current watermark from the Context object of a
ProcessFunction [1] and store it in operator state [2].
In case of a restart, the state will be restored with the watermark that
was active when the checkpoint (or savepoint) was taken. Note, this won't
be the last watermark before the failure happened.

Best,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/state.html#operator-state

2018-04-03 19:39 GMT+02:00 Chengzhi Zhao <w.zhaocheng...@gmail.com>:

> Thanks Timo for your response and the references.
>
> I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't work
> as expected, I will handle it as a separated pipeline.
> Also, is there a way to retrieve the last watermark before/after failure?
> So maybe I can persist the watermark to external storage and resume as a 
> separated
> pipeline?
>
> Best,
> Chengzhi
>
>
> On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twal...@apache.org> wrote:
>
>> Hi Chengzhi,
>>
>> if you emit a watermark even though there is still data with a lower
>> timestamp, you generate "late data" that either needs to be processed in a
>> separate branch of your pipeline (see sideOutputLateData() [1]) or should
>> force your existing operators to update their previously emitted results.
>> The latter means holding state or the contents of your windows longer (see
>> allowedLateness() [1]). I think in general a processing time watermark
>> strategy might not be suitable for reprocessing. Either you parameterize
>> your watermark generator such that you can pass information through job
>> parameters or you use another strategy such as
>> BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow
>> idempotent updates.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/stream/operators/windows.html#windows
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/event_timestamp_extractors.html
>>
>> Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
>>
>> Hello, flink community,
>>
>> I am using period watermark and extract the event time from each records
>> from files in S3. I am using the `TimeLagWatermarkGenerator` as it was
>> mentioned in flink documentation.
>>
>> Currently, a new watermark will be generated using processing time by
>> fixed amount
>>
>> override def getCurrentWatermark: Watermark = {
>>     new Watermark(System.currentTimeMillis() - maxTimeLag)
>> }
>>
>> This would work fine as long as process is running. However, in case of
>> failures, I mean if there was some bad data or out of memory occurs, I need
>> to stop the process and it will take me time to get back. If the maxTimeLag=3
>> hours, and it took me 12 hours to realize and fix it.
>>
>> My question is since I am using processing time as part of the watermark,
>> when flink resumed from failures, will some records might be ignored by the
>> watermark? And what's the best practice to catchup and continue without
>> losing data? Thanks!
>>
>> Best,
>> Chengzhi
>>
>>
>>
>

Reply via email to