Hi Guozhang,

I specifically to not want to do a get() on every future. This is
equivalent to going "sync"ed, which is a performance killer for us.

What I am looking for is some way to tell a producer to forget about queued
messages. I have done some more testing on my solution and it needs some
work. That said, the reliance on a thread named
"kafka-producer-network-thread" is something I don't like -- too much
knowledge of the KafkaProducer's internals.

Hoping for other suggestions...


Andrew Stein


On Fri, Sep 19, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Andrew,
>
> I think you would want a sync producer for your use case? You can try to
> call get() on the returned metadata future of the send() call instead of
> using a callback; the pattern is something like:
>
> for (message in messages)
>     producer.send(message).get()
>
> The get() call will block until the message response has received, and will
> throw an exception if the response is "failure", you the then catch the
> exception and pause the producer.
>
> Guozhang
>
> On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein <
> andrew.st...@quantumretail.com> wrote:
>
> > I am trying to understand the best practices for working with the new
> > (0.8.2) Producer interface.
> >
> > We have a process in a large server that writes a lot of data to Kafka.
> > However, this data is not mission critical. When a problem arises writing
> > to Kafka, most specifically network issues, but also full Producer
> buffers,
> > we want the server to continue working, but to stop sending data to
> Kafka,
> > allowing other tasks to continue. The issue I have is handling messages
> > that have been "sent" to the producer but are waiting to go to Kafka.
> These
> > messages remain long after my processing is over, timing out, writing to
> > the logs, and
> > preventing me from moving forward. I am looking for some way to tell the
> > client to stop forwarding messages to Kafka.
> >
> > This is what I have so far:
> >
> >     class ErrorCallback implements Callback {
> >         @Override
> >         public void onCompletion(RecordMetadata metadata, Exception
> > exception) {
> >             if (exception == null) { // The message was sent,
> >                 return;
> >             }
> >
> >             stopProducerSendAndClose();
> >             String threadName = Thread.currentThread().getName();
> >             if (!threadName.equals("kafka-producer-network-thread")) { //
> > Some of the callbacks happen on my thread
> >             } else { // We are in KafkaProducer's ioThread ==> commit
> > suicide.
> >                 Thread.currentThread().interrupt();
> >                 throw new ThreadDeath(); // Cannot throw an Exception as
> is
> > will just be caught and logged.
> >             }
> >         }
> >     }
> >
> > My question is, is this the correct approach, or is there some other way
> to
> > stop sending messages (short of going "sync"ed).
> >
> > Andrew Stein
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to