I believe you can extend the `KeyedDeserializationSchema` that you pass to
the consumer to check for end-of-stream markers.

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-

Eron

On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Jaxon,
>
> I don't think it's possible to control this via the life-cycle methods
> of your functions.
>
> Note that Flink currently does not support graceful stop in a
> meaningful manner and you can only cancel running jobs. What comes to
> my mind to cancel on EOF:
>
> 1) Extend Kafka consumer to stop emitting records after your EOF
> record. Look at the flink-connector-kafka-base module. This is
> probably not feasible and some work to get familiar with the code.
> Just putting in out there.
>
> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>
> 3) Use an Http client and cancel your job via the Http endpoint
> (https://ci.apache.org/projects/flink/flink-docs-
> release-1.4/monitoring/rest_api.html#job-cancellation).
> Easy, but not nice, since you need quite some logic in your function
> (e.g. ignore records after EOF record until cancellation, etc.).
>
> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>
> – Ufuk
>
>
> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
> > I would like to stop FlinkKafkaConsumer consuming data from kafka
> manually.
> > But I find it won't be close when I invoke "cancel()" method. What I am
> > trying to do is add an EOF symbol meaning the end of my kafka data, and
> when
> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
> > "cancel()" method. It doesn't work. Flink streaming job won't finish
> unless
> > it get canceled or failed, when I use kafka as source.
> >
> > Somebody knowing  gives me some help, thx~~
>

Reply via email to