Tnx! Looks like fix is already in for 0.10.1.0

On Tue, Oct 4, 2016 at 6:18 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue.
>
>
> Guozhang
>
> On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Srikanth,
> >
> > We close the underlying clients before closing the state manager (hence
> > the states) because for example we need to make sure producer's sent
> > records have all been acked before the state manager records the
> changelog
> > sent offsets as end offsets. This is kind of chicken-and-egg problem, and
> > we may be able to re-order the shutting down process in the future with
> > some added shutdown hooks.
> >
> > As of now, there is not a perfect solution to your scenario, and I would
> > like to suggest checking if producer's own batching mechanism is good
> > enough so you do not need to do this in the streams client layer.
> >
> >
> > Guozhang
> >
> > On Sat, Oct 1, 2016 at 2:20 PM, Srikanth <srikanth...@gmail.com> wrote:
> >
> >> Hello,
> >>
> >> I'm testing out a WriteToSinkProcessor() that batches records before
> >> writing it to a sink.
> >> The actual commit to sink happens in punctuate(). I also wanted to
> commit
> >> in close().
> >> Idea here is, during a regular shutdown, we'll commit all records and
> >> ideally stop with an empty state.
> >> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
> >> delete written keys from KV store.
> >>
> >> I get this exception when closing though. It looks like the kafka
> producer
> >> is closed before the changelog topic is updated after close().
> >> Should the producer be closed after all tasks and processors are closed?
> >>
> >> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
> >> processor instance
> >> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
> >> stream tasks in thread [StreamThread-1]:
> >> java.lang.IllegalStateException: Cannot send after the producer is
> >> closed.
> >>         at
> >> org.apache.kafka.clients.producer.internals.RecordAccumulato
> >> r.append(RecordAccumulator.java:173)
> >>         at
> >> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka
> >> Producer.java:467)
> >>         at
> >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr
> >> oducer.java:430)
> >>         at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> .send(RecordCollector.java:84)
> >>         at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> .send(RecordCollector.java:71)
> >>         at
> >> org.apache.kafka.streams.state.internals.StoreChangeLogger.
> >> logChange(StoreChangeLogger.java:108)
> >>         at
> >> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog
> >> gedStore.flush(InMemoryKeyValueLoggedStore.java:161)
> >>         at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >> e.flush(MeteredKeyValueStore.java:165)
> >>         at
> >> org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> anager.close(ProcessorStateManager.java:343)
> >>         at
> >> org.apache.kafka.streams.processor.internals.AbstractTask.
> >> close(AbstractTask.java:112)
> >>         at
> >> org.apache.kafka.streams.processor.internals.StreamTask.
> >> close(StreamTask.java:317)
> >>
> >> Srikanth
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to