Hi Robert, Thanks for your hint / reply / help.
So far I have not tested your way (may be next), but tried another one: * use mapPartitions -- at the beginning, get a KafkaProducer -- the KafkaProducerFactory class I use is lazy and caches the first instances created; so, there is reuse. * register a JVM hook for closing KafkaProducer. So far I have met some perf issue, but I don't know yet it's due to my pattern, or something else. Anyway, thanks. Regards, Dominique Le ven. 31 janv. 2020 à 14:20, Robert Metzger <[email protected]> a écrit : > Hi, > > Flink's ProcessFunction has a close() method, which is executed on > shutdown of the workers. (You could also use any of the Rich* functions for > that purpose). > If you add a ProcessFunction with the same parallelism before the > KafkaSink, it'll be executed on the same machines as the Kafka producer. > > Afaik, the close() call should not take forever, as the system might > interrupt your thread if it doesn't finish closing on time (30s is the > default for "cluster.services.shutdown-timeout") > > Best, > Robert > > > On Tue, Jan 21, 2020 at 10:02 AM Dominique De Vito <[email protected]> > wrote: > >> Hi, >> >> For a Flink batch job, some value are writing to Kafka through a Producer. >> >> I want to register a hook for closing (at the end) the Kafka producer a >> worker is using.... hook to be executed, of course, on worker side. >> >> Is there a way to do so ? >> >> Thanks. >> >> Regards, >> Dominique >> >> >> >>
