Hi Arnaud,

thanks for letting us know your workaround. I agree that this is a frequently asked topic and important in certain use cases. I'm sure that it will be solved in the near future depending on the priorities.

My 2 cents: Flink is an open source project maybe somebody is willing to work on a solution for one or more Flink sources :)

Regards,
Timo


Am 1/2/18 um 4:50 PM schrieb LINZ, Arnaud:
Hi,

My 2 cents: not being able to programmatically nicely stop a Flink stream is what 
lacks most to the framework IMHO. It's a very common use case: each time you want 
to update the application or change its configuration you need to nicely stop  
& restart it, without triggering alerts, data loss, or anything else.
That's why I never use the provided Flink Sources "out of the box". I've made a framework 
that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs 
stop file" and try to nicely cancel() the source if the user requested a stop by this mean 
(I've found that the hdfs file trick is most easy way to reach from an external application all 
task managers running on unknown hosts).

I could not use the "special message" trick because in most real production 
environment you cannot, as a client, post a message in a queue just for your client's 
need: you don't have proper access rights to do so ; and you don't know how other 
clients, connected to the same data, may react to fake messages...

Unfortunately most Flink sources cannot be "cancelled" nicely without changing 
part of their code. It's the case for the Kafka source.

- If a kafa consumer source instance is not connected to any partition (because 
it's parallelism level exceeds the kafka consumer group partition number for 
instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until 
thread is interrupted :

                         // wait until this is canceled
                         final Object waitLock = new Object();
                         while (running) {
                                 try {
                                         //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
                                         synchronized (waitLock) {
                                                 waitLock.wait();
                                         }
                                 }
                                 catch (InterruptedException e) {
                                         if (!running) {
                                                 // restore the interrupted 
state, and fall through the loop
                                                 
Thread.currentThread().interrupt();
                                         }
                                 }
                         }

So either you change the code, or in your monitoring thread you interrupt the 
source thread -- but that will trigger the HA mechanism, the source instance 
will be relaunched n times before failing.

- BTW it's also the case with RMQSource, as the "nextDelivery" in 
RMQSource.run() is called without timeout :
         @Override
         public void run(SourceContext<OUT> ctx) throws Exception {
                 while (running) {
                         QueueingConsumer.Delivery delivery = 
consumer.nextDelivery();

So if no message arrives, the while running check is not done and the source 
cannot be cancelled without hard interruption.

Best regards,
Arnaud


-----Message d'origine-----
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright <eronwri...@gmail.com>
Cc : Ufuk Celebi <u...@apache.org>; Jaxon Hu <hujiaxu...@gmail.com>; user 
<user@flink.apache.org>; Aljoscha Krettek <aljos...@apache.org>
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?

Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer 
Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote:
I believe you can extend the `KeyedDeserializationSchema` that you
pass to the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
a.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:
Hey Jaxon,

I don't think it's possible to control this via the life-cycle
methods of your functions.

Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:

1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.

2) Throw a "SuccessException" that fails the job. Easy, but not nice.

3) Use an Http client and cancel your job via the Http endpoint

(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).

Maybe Aljoscha (cc'd) has an idea how to do this in a better way.

– Ufuk


On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
I would like to stop FlinkKafkaConsumer consuming data from kafka
manually.
But I find it won't be close when I invoke "cancel()" method. What
I am trying to do is add an EOF symbol meaning the end of my kafka
data, and when the FlatMap operator read the symbol it will invoke
FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
streaming job won't finish unless it get canceled or failed, when I
use kafka as source.

Somebody knowing  gives me some help, thx~~

________________________________

L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Reply via email to