Hi Tamir,

Ok, let's take a step back. First of all let's assume we have a bounded
source already. If so, when this source ends, it will emit MAX_WATERMARK
shortly before closing and ending the stream. This MAX_WATERMARK will start
flowing through your job graph firing all remaining event time timers. The
question is do you need to register a timer? Maybe it's good enough for you
to just check that your job has finished. In other words, what do you mean
by:

> *I'm looking for an elegant way to notify upon completion.*

If for some reason, you want to execute code inside Flink, that's executed
AFTER all records are written into your Sink you will hit a problem that I
meant by:

> keep in mind Flink is a distributed system so downstream
operators/functions might still be busy for some time processing last
records, while upstream operators/functions are already finished).

If you have a Job with three different tasks: Source -> Task1 -> Sink, keep
in mind that between tasks there will be some buffered data for network
exchanges. What this means is if you register an end of event time timer
(waiting for `MAX_WATERMARK`) in for example Task1, this timer might
execute (according to wall clock) before some last buffered records between
Task1 and Sink will be processed.

This goes one step further. Even if you register your timer in the same
task as your sink function, if there is an operator chain in that Sink task
that looks like follows:

FunctionWithYourEndTimeTimer ---- (chained) ---> WindowOperator ---
(chained) ----> SinkFunction

Even if those three operators/functions are chained, and there are no
buffered records between them, `WindowOperator` is buffering records
internally. So the same problem can happen again. Timer from
`FunctionWithYourEndTimeTimer` will fire before last buffered records
inside `WindowOperator` are flushed to the `SinkFunction`.

So you would need to have:

WindowOperator --- (chained) ----> FunctionWithYourEndTimeTimer ----
(chained) ---> SinkFunction

But even that might not solve your problem, as most likely SinkFunction is
buffering some records in one way or another and maybe you want to execute
a code after those records are committed flushed to the external system? To
fully solve this you would need FLIP-143 [1] sinks and it's GlobalCommitter
[2], combined with work in progress FLIP-147 [3].

Best,
Piotrek

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/connector/sink/class-use/GlobalCommitter.html
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

śr., 14 lip 2021 o 15:26 Timo Walther <twal...@apache.org> napisał(a):

> Hi Tamir,
>
> a nice property of watermarks is that they are kind of synchronized
> across input operators and their partitions (i.e. parallel instances).
> Bounded sources will emit a final MAX_WATERMARK once they have processed
> all data. When you receive a MAX_WATERMARK in your current operator, you
> can be sure that all data has been processed upstream. And all records
> have arrived at your operator's parallel instance.
>
> Regards,
> Timo
>
>
> On 14.07.21 15:05, Tamir Sagi wrote:
> > Hey Piotr,
> >
> > Thank you for fast response,
> >
> > The refs are good, however , to be honest, I'm a little confused
> > regarding the trick with MAX_WATERMARK . Maybe I'm missing something.
> >
> >     keep in mind Flink is a distributed system so
> >     downstream operators/functions might still be busy for some time
> >     processing last records, while upstream operators/functions are
> >     already finished).
> >
> > I'm trying to understand based on your suggestion and some Ref[1] how
> > MAX_WATERMARK could be useful in such scenario if it might be processed
> > before #{MAX_WATERMARK - 1} .
> >
> > Following [2], MAX_WATERMARK = The watermark that signifies
> > end-of-event-time.
> >
> > Thank you,
> >
> > Tamir.
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time
> >
> >
> > Timely Stream Processing | Apache Flink
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/concepts/time/#notions-of-time-event-time-and-processing-time
> >
> > Timely Stream Processing # Introduction # Timely stream processing is an
> > extension of stateful stream processing in which time plays some role in
> > the computation. Among other things, this is the case when you do time
> > series analysis, when doing aggregations based on certain time periods
> > (typically called windows), or when you do event processing where the
> > time when an event occurred is important.
> > ci.apache.org
> >
> >
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/watermark/Watermark.html#MAX_WATERMARK
> >
> >
> >
> > ------------------------------------------------------------------------
> > *From:* Piotr Nowojski <pnowoj...@apache.org>
> > *Sent:* Wednesday, July 14, 2021 1:36 PM
> > *To:* Tamir Sagi <tamir.s...@niceactimize.com>
> > *Cc:* user@flink.apache.org <user@flink.apache.org>
> > *Subject:* Re: Process finite stream and notify upon completion
> >
> > *EXTERNAL EMAIL*
> >
> >
> >
> > Hi Tamir,
> >
> > Sorry I missed that you want to use Kafka. In that case I would suggest
> > trying out the new KafkaSource [1] interface and it's built-in boundness
> > support [2][3]. Maybe it will do the trick? If you want to be notified
> > explicitly about the completion of such a bounded Kafka stream, you
> > still can use this `Watermark#MAX_WATERMARK` trick mentioned above.
> >
> > If not, can you let us know what is not working?
> >
> > Best,
> > Piotrek
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
> >
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
> >
> > [3]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-
> > <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-
> >
> >
> >
> > śr., 14 lip 2021 o 11:59 Tamir Sagi <tamir.s...@niceactimize.com
> > <mailto:tamir.s...@niceactimize.com>> napisał(a):
> >
> >     Hey Piotr,
> >
> >     Thank you for your response.
> >
> >     I saw the exact suggestion answer by David Anderson [1] but did not
> >     really understand how it may help.
> >
> >         Sources when finishing are emitting
> >
>  {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
> >
> >     Assuming 10 messages are sent to Kafka topic , processed and saved
> >     into DB
> >
> >      1. Kafka is not considered a finite source, after the 10th element
> >         it will wait for more input, no?
> >      2. In such case, the 10th element will be marked with MAX_WATERMARK
> >         or not? or at some point in the future?
> >
> >     Now, Let's say the 10th element will be marked with MAX_WATERMARK,
> >     How will I know when all elements have been saved into DB?
> >
> >     Here is the execution Graph
> >     Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)
> >
> >     Would you please elaborate about the time event function? where
> >     exactly will it be integrated into the aforementioned execution
> graph ?
> >
> >     Another question I have, based on our discussion. If the only thing
> >     that changed is the source, apart from that the entire flow is the
> >     same(operators and sink);  is there any good practice to achieve a
> >     single job for that?
> >
> >     Tamir.
> >
> >     [1]
> >
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> >     <
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> >
> >
>  ------------------------------------------------------------------------
> >     *From:* Piotr Nowojski <pnowoj...@apache.org
> >     <mailto:pnowoj...@apache.org>>
> >     *Sent:* Tuesday, July 13, 2021 4:54 PM
> >     *To:* Tamir Sagi <tamir.s...@niceactimize.com
> >     <mailto:tamir.s...@niceactimize.com>>
> >     *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
> >     <user@flink.apache.org <mailto:user@flink.apache.org>>
> >     *Subject:* Re: Process finite stream and notify upon completion
> >
> >     *EXTERNAL EMAIL*
> >
> >
> >
> >     Hi,
> >
> >     Sources when finishing are emitting
> >
>  {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so
> >     I think the best approach is to register an even time timer for
> >     {{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK -
> >     1}}. If your function registers such a timer, it would be processed
> >     after processing all of the records by that function (keep in mind
> >     Flink is a distributed system so downstream operators/functions
> >     might still be busy for some time processing last records, while
> >     upstream operators/functions are already finished).
> >
> >     Alternatively you can also implement a custom operator that
> >     implements {{BoundedOneInput}} interface [1], it would work in the
> >     same way, but implementing a custom operator is more difficult, only
> >     semi officially supported and not well documented.
> >
> >     Best,
> >     Piotrek
> >
> >     [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html
> >     <
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html
> >
> >
> >     pon., 12 lip 2021 o 12:44 Tamir Sagi <tamir.s...@niceactimize.com
> >     <mailto:tamir.s...@niceactimize.com>> napisał(a):
> >
> >         Hey Community,
> >
> >         I'm working on a stream job that should aggregate a bounded data
> >         and notify upon completion. (It works in Batch mode; however,
> >         I'm trying to achieve the same results in Stream mode, if
> possible).
> >
> >         Source: Kafka
> >         Sink: PostgresDB
> >
> >         *I'm looking for an elegant way to notify upon completion.*
> >
> >         One solution I have in mind (Not perfect but might work)
> >
> >          1. Send message to topic for every record which successfully
> >             saved into DB (From sink)
> >          2. Consume those messages externally to cluster
> >          3. If message is not consumed for fixed time, we assume the
> >             process has finished.
> >
> >         I was also wondering if TimeEventWindow with custom trigger and
> >         AggregationFunction may help me here
> >         However, I could not find a way to detect when all records
> >         have been processed within the window.
> >
> >         I'd go with Flink base solution if exists.
> >
> >         Various References
> >         flink-append-an-event-to-the-end-of-finite-datastream
> >         <
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> >
> >         how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic
> >         <
> https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic
> >
> >
> >         Best,
> >
> >         Tamir.
> >
> >
> >         Confidentiality: This communication and any attachments are
> >         intended for the above-named persons only and may be
> >         confidential and/or legally privileged. Any opinions expressed
> >         in this communication are not necessarily those of NICE
> >         Actimize. If this communication has come to you in error you
> >         must take no action based on it, nor must you copy or show it to
> >         anyone; please delete/destroy and inform the sender by e-mail
> >         immediately.
> >         Monitoring: NICE Actimize may monitor incoming and outgoing
> e-mails.
> >         Viruses: Although we have taken steps toward ensuring that this
> >         e-mail and attachments are free from any virus, we advise that
> >         in keeping with good computing practice the recipient should
> >         ensure they are actually virus free.
> >
> >
> >     Confidentiality: This communication and any attachments are intended
> >     for the above-named persons only and may be confidential and/or
> >     legally privileged. Any opinions expressed in this communication are
> >     not necessarily those of NICE Actimize. If this communication has
> >     come to you in error you must take no action based on it, nor must
> >     you copy or show it to anyone; please delete/destroy and inform the
> >     sender by e-mail immediately.
> >     Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> >     Viruses: Although we have taken steps toward ensuring that this
> >     e-mail and attachments are free from any virus, we advise that in
> >     keeping with good computing practice the recipient should ensure
> >     they are actually virus free.
> >
> >
> > Confidentiality: This communication and any attachments are intended for
> > the above-named persons only and may be confidential and/or legally
> > privileged. Any opinions expressed in this communication are not
> > necessarily those of NICE Actimize. If this communication has come to
> > you in error you must take no action based on it, nor must you copy or
> > show it to anyone; please delete/destroy and inform the sender by e-mail
> > immediately.
> > Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> > Viruses: Although we have taken steps toward ensuring that this e-mail
> > and attachments are free from any virus, we advise that in keeping with
> > good computing practice the recipient should ensure they are actually
> > virus free.
> >
>
>

Reply via email to