Hi Arnaud,
thanks for letting us know your workaround. I agree that this is a
frequently asked topic and important in certain use cases. I'm sure that
it will be solved in the near future depending on the priorities.
My 2 cents: Flink is an open source project maybe somebody is willing to
work on a solution for one or more Flink sources :)
Regards,
Timo
Am 1/2/18 um 4:50 PM schrieb LINZ, Arnaud:
Hi,
My 2 cents: not being able to programmatically nicely stop a Flink stream is what
lacks most to the framework IMHO. It's a very common use case: each time you want
to update the application or change its configuration you need to nicely stop
& restart it, without triggering alerts, data loss, or anything else.
That's why I never use the provided Flink Sources "out of the box". I've made a framework
that encapsulate them, adding a monitoring thread that periodically check for a special "hdfs
stop file" and try to nicely cancel() the source if the user requested a stop by this mean
(I've found that the hdfs file trick is most easy way to reach from an external application all
task managers running on unknown hosts).
I could not use the "special message" trick because in most real production
environment you cannot, as a client, post a message in a queue just for your client's
need: you don't have proper access rights to do so ; and you don't know how other
clients, connected to the same data, may react to fake messages...
Unfortunately most Flink sources cannot be "cancelled" nicely without changing
part of their code. It's the case for the Kafka source.
- If a kafa consumer source instance is not connected to any partition (because
it's parallelism level exceeds the kafka consumer group partition number for
instance), we end up in an infinite wait in FlinkKafkaConsumerBase.run() until
thread is interrupted :
// wait until this is canceled
final Object waitLock = new Object();
while (running) {
try {
//noinspection
SynchronizationOnLocalVariableOrMethodParameter
synchronized (waitLock) {
waitLock.wait();
}
}
catch (InterruptedException e) {
if (!running) {
// restore the interrupted
state, and fall through the loop
Thread.currentThread().interrupt();
}
}
}
So either you change the code, or in your monitoring thread you interrupt the
source thread -- but that will trigger the HA mechanism, the source instance
will be relaunched n times before failing.
- BTW it's also the case with RMQSource, as the "nextDelivery" in
RMQSource.run() is called without timeout :
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
while (running) {
QueueingConsumer.Delivery delivery =
consumer.nextDelivery();
So if no message arrives, the while running check is not done and the source
cannot be cancelled without hard interruption.
Best regards,
Arnaud
-----Message d'origine-----
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright <eronwri...@gmail.com>
Cc : Ufuk Celebi <u...@apache.org>; Jaxon Hu <hujiaxu...@gmail.com>; user
<user@flink.apache.org>; Aljoscha Krettek <aljos...@apache.org>
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?
Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer
Eron.
– Ufuk
On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote:
I believe you can extend the `KeyedDeserializationSchema` that you
pass to the consumer to check for end-of-stream markers.
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/o
rg/apache/flink/streaming/util/serialization/KeyedDeserializationSchem
a.html#isEndOfStream-T-
Eron
On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:
Hey Jaxon,
I don't think it's possible to control this via the life-cycle
methods of your functions.
Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:
1) Extend Kafka consumer to stop emitting records after your EOF
record. Look at the flink-connector-kafka-base module. This is
probably not feasible and some work to get familiar with the code.
Just putting in out there.
2) Throw a "SuccessException" that fails the job. Easy, but not nice.
3) Use an Http client and cancel your job via the Http endpoint
(https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
Easy, but not nice, since you need quite some logic in your function
(e.g. ignore records after EOF record until cancellation, etc.).
Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
– Ufuk
On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
I would like to stop FlinkKafkaConsumer consuming data from kafka
manually.
But I find it won't be close when I invoke "cancel()" method. What
I am trying to do is add an EOF symbol meaning the end of my kafka
data, and when the FlatMap operator read the symbol it will invoke
FlinkKafkaConsumer "cancel()" method. It doesn't work. Flink
streaming job won't finish unless it get canceled or failed, when I
use kafka as source.
Somebody knowing gives me some help, thx~~
________________________________
L'intégrité de ce message n'étant pas assurée sur internet, la société
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
l'expéditeur.
The integrity of this message cannot be guaranteed on the Internet. The company
that sent this message cannot therefore be held liable for its content nor
attachments. Any unauthorized use or dissemination is prohibited. If you are
not the intended recipient of this message, then please delete it and notify
the sender.