Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <> wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
> Eron
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <> wrote:
>> Hey Jaxon,
>> I don't think it's possible to control this via the life-cycle methods
>> of your functions.
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>> 3) Use an Http client and cancel your job via the Http endpoint
>> (
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>> – Ufuk
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What I am
>> > trying to do is add an EOF symbol meaning the end of my kafka data, and
>> > when
>> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
>> > "cancel()" method. It doesn't work. Flink streaming job won't finish
>> > unless
>> > it get canceled or failed, when I use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~

Reply via email to