Tnx! Looks like fix is already in for 0.10.1.0 On Tue, Oct 4, 2016 at 6:18 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue. > > > Guozhang > > On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Srikanth, > > > > We close the underlying clients before closing the state manager (hence > > the states) because for example we need to make sure producer's sent > > records have all been acked before the state manager records the > changelog > > sent offsets as end offsets. This is kind of chicken-and-egg problem, and > > we may be able to re-order the shutting down process in the future with > > some added shutdown hooks. > > > > As of now, there is not a perfect solution to your scenario, and I would > > like to suggest checking if producer's own batching mechanism is good > > enough so you do not need to do this in the streams client layer. > > > > > > Guozhang > > > > On Sat, Oct 1, 2016 at 2:20 PM, Srikanth <srikanth...@gmail.com> wrote: > > > >> Hello, > >> > >> I'm testing out a WriteToSinkProcessor() that batches records before > >> writing it to a sink. > >> The actual commit to sink happens in punctuate(). I also wanted to > commit > >> in close(). > >> Idea here is, during a regular shutdown, we'll commit all records and > >> ideally stop with an empty state. > >> My commit() process is 3 step 1) Read from KV store 2) write to sink 3) > >> delete written keys from KV store. > >> > >> I get this exception when closing though. It looks like the kafka > producer > >> is closed before the changelog topic is updated after close(). > >> Should the producer be closed after all tasks and processors are closed? > >> > >> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing > >> processor instance > >> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove > >> stream tasks in thread [StreamThread-1]: > >> java.lang.IllegalStateException: Cannot send after the producer is > >> closed. > >> at > >> org.apache.kafka.clients.producer.internals.RecordAccumulato > >> r.append(RecordAccumulator.java:173) > >> at > >> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka > >> Producer.java:467) > >> at > >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr > >> oducer.java:430) > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> .send(RecordCollector.java:84) > >> at > >> org.apache.kafka.streams.processor.internals.RecordCollector > >> .send(RecordCollector.java:71) > >> at > >> org.apache.kafka.streams.state.internals.StoreChangeLogger. > >> logChange(StoreChangeLogger.java:108) > >> at > >> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog > >> gedStore.flush(InMemoryKeyValueLoggedStore.java:161) > >> at > >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor > >> e.flush(MeteredKeyValueStore.java:165) > >> at > >> org.apache.kafka.streams.processor.internals.ProcessorStateM > >> anager.close(ProcessorStateManager.java:343) > >> at > >> org.apache.kafka.streams.processor.internals.AbstractTask. > >> close(AbstractTask.java:112) > >> at > >> org.apache.kafka.streams.processor.internals.StreamTask. > >> close(StreamTask.java:317) > >> > >> Srikanth > >> > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang >