Hi Tamir,

Sorry I missed that you want to use Kafka. In that case I would suggest
trying out the new KafkaSource [1] interface and it's built-in boundness
support [2][3]. Maybe it will do the trick? If you want to be notified
explicitly about the completion of such a bounded Kafka stream, you still
can use this `Watermark#MAX_WATERMARK` trick mentioned above.

If not, can you let us know what is not working?

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-source
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#boundedness
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.html#setBounded-org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer-


śr., 14 lip 2021 o 11:59 Tamir Sagi <tamir.s...@niceactimize.com>
napisał(a):

> Hey Piotr,
>
> Thank you for your response.
>
> I saw the exact suggestion answer by David Anderson [1] but did not really
> understand how it may help.
>
> Sources when finishing are emitting
> {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}
>
> Assuming 10 messages are sent to Kafka topic , processed and saved into DB
>
>    1. Kafka is not considered a finite source, after the 10th element it
>    will wait for more input, no?
>    2. In such case, the 10th element will be marked with MAX_WATERMARK or
>    not? or at some point in the future?
>
> Now, Let's say the 10th element will be marked with MAX_WATERMARK, How
> will I know when all elements have been saved into DB?
>
> Here is the execution Graph
> Source(Kafka) --> Operator --- > Operator 2 --> Sink(PostgresSQL)
>
> Would you please elaborate about the time event function? where exactly
> will it be integrated into the aforementioned execution graph ?
>
> Another question I have, based on our discussion. If the only thing that
> changed is the source, apart from that the entire flow is the
> same(operators and sink);  is there any good practice to achieve a single
> job for that?
>
> Tamir.
>
> [1]
> https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302
> ------------------------------
> *From:* Piotr Nowojski <pnowoj...@apache.org>
> *Sent:* Tuesday, July 13, 2021 4:54 PM
> *To:* Tamir Sagi <tamir.s...@niceactimize.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Subject:* Re: Process finite stream and notify upon completion
>
>
> *EXTERNAL EMAIL*
>
>
> Hi,
>
> Sources when finishing are emitting
> {{org.apache.flink.streaming.api.watermark.Watermark#MAX_WATERMARK}}, so I
> think the best approach is to register an even time timer for
> {{Watermark#MAX_WATERMARK}} or maybe {{Watermark#MAX_WATERMARK - 1}}. If
> your function registers such a timer, it would be processed after
> processing all of the records by that function (keep in mind Flink is a
> distributed system so downstream operators/functions might still be busy
> for some time processing last records, while upstream operators/functions
> are already finished).
>
> Alternatively you can also implement a custom operator that implements
> {{BoundedOneInput}} interface [1], it would work in the same way, but
> implementing a custom operator is more difficult, only semi officially
> supported and not well documented.
>
> Best,
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/operators/BoundedOneInput.html
>
> pon., 12 lip 2021 o 12:44 Tamir Sagi <tamir.s...@niceactimize.com>
> napisał(a):
>
> Hey Community,
>
> I'm working on a stream job that should aggregate a bounded data and
> notify upon completion. (It works in Batch mode; however, I'm trying to
> achieve the same results in Stream mode, if possible).
>
> Source: Kafka
> Sink: PostgresDB
>
> *I'm looking for an elegant way to notify upon completion.*
>
> One solution I have in mind (Not perfect but might work)
>
>    1. Send message to topic for every record which successfully saved
>    into DB (From sink)
>    2. Consume those messages externally to cluster
>    3. If message is not consumed for fixed time, we assume the process
>    has finished.
>
> I was also wondering if TimeEventWindow with custom trigger and
> AggregationFunction may help me here
> However, I could not find a way to detect when all records have been
> processed within the window.
>
> I'd go with Flink base solution if exists.
>
> Various References
> flink-append-an-event-to-the-end-of-finite-datastream
> <https://stackoverflow.com/questions/54687372/flink-append-an-event-to-the-end-of-finite-datastream#answer-54697302>
> how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic
> <https://stackoverflow.com/questions/48427775/how-can-i-know-that-i-have-consumed-all-of-a-kafka-topic>
>
> Best,
>
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>

Reply via email to