Yes, that sounds like what Jaxon is looking for. :-) Thanks for the
pointer Eron.

– Ufuk

On Thu, Dec 28, 2017 at 8:13 PM, Eron Wright <eronwri...@gmail.com> wrote:
> I believe you can extend the `KeyedDeserializationSchema` that you pass to
> the consumer to check for end-of-stream markers.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.html#isEndOfStream-T-
>
> Eron
>
> On Wed, Dec 27, 2017 at 12:35 AM, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Jaxon,
>>
>> I don't think it's possible to control this via the life-cycle methods
>> of your functions.
>>
>> Note that Flink currently does not support graceful stop in a
>> meaningful manner and you can only cancel running jobs. What comes to
>> my mind to cancel on EOF:
>>
>> 1) Extend Kafka consumer to stop emitting records after your EOF
>> record. Look at the flink-connector-kafka-base module. This is
>> probably not feasible and some work to get familiar with the code.
>> Just putting in out there.
>>
>> 2) Throw a "SuccessException" that fails the job. Easy, but not nice.
>>
>> 3) Use an Http client and cancel your job via the Http endpoint
>>
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html#job-cancellation).
>> Easy, but not nice, since you need quite some logic in your function
>> (e.g. ignore records after EOF record until cancellation, etc.).
>>
>> Maybe Aljoscha (cc'd) has an idea how to do this in a better way.
>>
>> – Ufuk
>>
>>
>> On Mon, Dec 25, 2017 at 8:59 AM, Jaxon Hu <hujiaxu...@gmail.com> wrote:
>> > I would like to stop FlinkKafkaConsumer consuming data from kafka
>> > manually.
>> > But I find it won't be close when I invoke "cancel()" method. What I am
>> > trying to do is add an EOF symbol meaning the end of my kafka data, and
>> > when
>> > the FlatMap operator read the symbol it will invoke FlinkKafkaConsumer
>> > "cancel()" method. It doesn't work. Flink streaming job won't finish
>> > unless
>> > it get canceled or failed, when I use kafka as source.
>> >
>> > Somebody knowing  gives me some help, thx~~
>
>

Reply via email to