Hi, Yes we are aware of this issue and we would like to have it soon, but at the moment it does not look like clean shutdown will be ready for Flink 1.5.
Another solution is Kafka exactly-once producer implemented on top of the GenericWriteAheadSink. It could avoid this issue (at a cost of significantly higher overhead). There are plans to implement such producer as an alternative to the current one, but I do not know the timeline for that. It should be relatively easy task and we would welcome such contribution. Piotrek > On 14 Dec 2017, at 01:43, Elias Levy <fearsome.lucid...@gmail.com> wrote: > > I am re-upping this thread now that FlinkKafkaProducer011 is out. The new > producer, when used with the exactly once semantics, has the rather > troublesome behavior that it will fallback to at-most-once, rather than > at-least-once, if the job is down for longer than the Kafka broker's > transaction.max.timeout.ms <http://transaction.max.timeout.ms/> setting. > > In situations that require extended maintenance downtime, this behavior is > nearly certain to lead to message loss, as a canceling a job while taking a > savepoint will not wait for the Kafka transactions to bet committed and is > not atomic. > > So it seems like there is a need for an atomic stop or cancel with savepoint > that waits for transactional sinks to commit and then immediately stop any > further message processing. > > > On Tue, Oct 24, 2017 at 4:46 AM, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > I would propose implementations of NewSource to be not blocking/asynchronous. > For example something like > > public abstract Future<T> getCurrent(); > > Which would allow us to perform some certain actions while there are no data > available to process (for example flush output buffers). Something like this > came up recently when we were discussing possible future changes in the > network stack. It wouldn’t complicate API by a lot, since default > implementation could just: > > public Future<T> getCurrent() { > return completedFuture(getCurrentBlocking()); > } > > Another thing to consider is maybe we would like to leave the door open for > fetching records in some batches from the source’s input buffers? Source > function (like Kafka) have some internal buffers and it would be more > efficient to read all/deserialise all data present in the input buffer at > once, instead of paying synchronisation/calling virtual method/etc costs once > per each record. > > Piotrek > >> On 22 Sep 2017, at 11:13, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> >> @Eron Yes, that would be the difference in characterisation. I think >> technically all sources could be transformed by that by pushing data into a >> (blocking) queue and having the "getElement()" method pull from that. >> >>> On 15. Sep 2017, at 20:17, Elias Levy <fearsome.lucid...@gmail.com >>> <mailto:fearsome.lucid...@gmail.com>> wrote: >>> >>> On Fri, Sep 15, 2017 at 10:02 AM, Eron Wright <eronwri...@gmail.com >>> <mailto:eronwri...@gmail.com>> wrote: >>> Aljoscha, would it be correct to characterize your idea as a 'pull' source >>> rather than the current 'push'? It would be interesting to look at the >>> existing connectors to see how hard it would be to reverse their >>> orientation. e.g. the source might require a buffer pool. >>> >>> The Kafka client works that way. As does the QueueingConsumer used by the >>> RabbitMQ source. The Kinesis and NiFi sources also seems to poll. Those >>> are all the bundled sources. >> > >