I would like to stop FlinkKafkaConsumer consuming data from kafka manually.
But I find it won't be close when I invoke "cancel()" method. What I am
trying to do is add an EOF symbol meaning the end of my kafka data, and
when the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
"ca
Hey Jaxon,
I don't think it's possible to control this via the life-cycle methods
of your functions.
Note that Flink currently does not support graceful stop in a
meaningful manner and you can only cancel running jobs. What comes to
my mind to cancel on EOF:
1) Extend Kafka consumer to stop emit
I believe you can extend the `KeyedDeserializationSchema` that you pass to
the consumer to check for end-of-stream markers.
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-
Eron
On
Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.
– Ufuk
On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/p
Thanks Eron
I have tried to read an EOF symbol and invoke FlinkKafkaConsumer's cancel
method, it doesn't work. But I invoke the method in a FlatMap operator
which is next to source operator, I guess that is the problem. I will try
your answer, thanks for your suggestion.
--
Sent from: http://a
ry();
So if no message arrives, the while running check is not done and the source
cannot be cancelled without hard interruption.
Best regards,
Arnaud
-Message d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright
Cc : Ufuk C
ge d'origine-
De : Ufuk Celebi [mailto:u...@apache.org]
Envoyé : vendredi 29 décembre 2017 10:30
À : Eron Wright
Cc : Ufuk Celebi ; Jaxon Hu ; user
; Aljoscha Krettek
Objet : Re: How to stop FlinkKafkaConsumer and make job finished?
Yes, that sounds like what Jaxon is looking for. :-) Tha
;>
>> - BTW it's also the case with RMQSource, as the "nextDelivery" in
>> RMQSource.run() is called without timeout :
>> @Override
>> public void run(SourceContext ctx) throws Exception {
>> while (running) {
>>