Yes, that sounds like what Jaxon is looking for. :-) Thanks for the pointer Eron.
– Ufuk On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote: > I believe you can extend the `KeyedDeserializationSchema` that you pass to > the consumer to check for end-of-stream markers. > > https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T- > > Eron > > On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote: >> >> Hey Jaxon, >> >> I don't think it's possible to control this via the life-cycle methods >> of your functions. >> >> Note that Flink currently does not support graceful stop in a >> meaningful manner and you can only cancel running jobs. What comes to >> my mind to cancel on EOF: >> >> 1) Extend Kafka consumer to stop emitting records after your EOF >> record. Look at the flink-connector-kafka-base module. This is >> probably not feasible and some work to get familiar with the code. >> Just putting in out there. >> >> 2) Throw a "SuccessException" that fails the job. Easy, but not nice. >> >> 3) Use an Http client and cancel your job via the Http endpoint >> >> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation). >> Easy, but not nice, since you need quite some logic in your function >> (e.g. ignore records after EOF record until cancellation, etc.). >> >> Maybe Aljoscha (cc'd) has an idea how to do this in a better way. >> >> – Ufuk >> >> >> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote: >> > I would like to stop FlinkKafkaConsumer consuming data from kafka >> > manually. >> > But I find it won't be close when I invoke "cancel()" method. What I am >> > trying to do is add an EOF symbol meaning the end of my kafka data, and >> > when >> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer >> > "cancel()" method. It doesn't work. Flink streaming job won't finish >> > unless >> > it get canceled or failed, when I use kafka as source. >> > >> > Somebody knowing gives me some help, thx~~ > >