I believe you can extend the `KeyedDeserializationSchema` that you pass to the consumer to check for end-of-stream markers.
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T- Eron On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote: > Hey Jaxon, > > I don't think it's possible to control this via the life-cycle methods > of your functions. > > Note that Flink currently does not support graceful stop in a > meaningful manner and you can only cancel running jobs. What comes to > my mind to cancel on EOF: > > 1) Extend Kafka consumer to stop emitting records after your EOF > record. Look at the flink-connector-kafka-base module. This is > probably not feasible and some work to get familiar with the code. > Just putting in out there. > > 2) Throw a "SuccessException" that fails the job. Easy, but not nice. > > 3) Use an Http client and cancel your job via the Http endpoint > (https://ci.apache.org/projects/flink/flink-docs- > release-1.4/monitoring/rest_api.html#job-cancellation). > Easy, but not nice, since you need quite some logic in your function > (e.g. ignore records after EOF record until cancellation, etc.). > > Maybe Aljoscha (cc'd) has an idea how to do this in a better way. > > – Ufuk > > > On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote: > > I would like to stop FlinkKafkaConsumer consuming data from kafka > manually. > > But I find it won't be close when I invoke "cancel()" method. What I am > > trying to do is add an EOF symbol meaning the end of my kafka data, and > when > > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer > > "cancel()" method. It doesn't work. Flink streaming job won't finish > unless > > it get canceled or failed, when I use kafka as source. > > > > Somebody knowing gives me some help, thx~~ >