Andrew If you see BaseProducer.scala, there's the following code snippet
override def send(topic: String, key: Array[Byte], value: Array[Byte]) { val record = new ProducerRecord(topic, key, value) if(sync) { this.producer.send(record).get() } else { this.producer.send(record, new ErrorLoggingCallback(topic, key, value, false)) } } NewShinyProducer is used in ProducerPerformance.scala. When I tested both sync mode and async mode, I didn't see much difference. So, I guess you can safely use sync. Also, there's ProducerPerformance.java in the new version of producer. Callback cb = stats.nextCompletion(sendStart, payload.length, stats); producer.send(record, cb); public Callback nextCompletion(long start, int bytes, Stats stats) { Callback cb = new PerfCallback(this.iteration, start, bytes, stats); this.iteration++; return cb; } private static final class PerfCallback implements Callback { private final long start; private final int iteration; private final int bytes; private final Stats stats; public PerfCallback(int iter, long start, int bytes, Stats stats) { this.start = start; this.stats = stats; this.iteration = iter; this.bytes = bytes; } public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); this.stats.record(iteration, latency, bytes, now); if (exception != null) exception.printStackTrace(); } } I hope this will be helpful. On Fri, Sep 19, 2014 at 1:47 PM, Andrew Stein < andrew.st...@quantumretail.com> wrote: > Hi Guozhang, > > I specifically to not want to do a get() on every future. This is > equivalent to going "sync"ed, which is a performance killer for us. > > What I am looking for is some way to tell a producer to forget about queued > messages. I have done some more testing on my solution and it needs some > work. That said, the reliance on a thread named > "kafka-producer-network-thread" is something I don't like -- too much > knowledge of the KafkaProducer's internals. > > Hoping for other suggestions... > > > Andrew Stein > > > On Fri, Sep 19, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Andrew, > > > > I think you would want a sync producer for your use case? You can try to > > call get() on the returned metadata future of the send() call instead of > > using a callback; the pattern is something like: > > > > for (message in messages) > > producer.send(message).get() > > > > The get() call will block until the message response has received, and > will > > throw an exception if the response is "failure", you the then catch the > > exception and pause the producer. > > > > Guozhang > > > > On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein < > > andrew.st...@quantumretail.com> wrote: > > > > > I am trying to understand the best practices for working with the new > > > (0.8.2) Producer interface. > > > > > > We have a process in a large server that writes a lot of data to Kafka. > > > However, this data is not mission critical. When a problem arises > writing > > > to Kafka, most specifically network issues, but also full Producer > > buffers, > > > we want the server to continue working, but to stop sending data to > > Kafka, > > > allowing other tasks to continue. The issue I have is handling messages > > > that have been "sent" to the producer but are waiting to go to Kafka. > > These > > > messages remain long after my processing is over, timing out, writing > to > > > the logs, and > > > preventing me from moving forward. I am looking for some way to tell > the > > > client to stop forwarding messages to Kafka. > > > > > > This is what I have so far: > > > > > > class ErrorCallback implements Callback { > > > @Override > > > public void onCompletion(RecordMetadata metadata, Exception > > > exception) { > > > if (exception == null) { // The message was sent, > > > return; > > > } > > > > > > stopProducerSendAndClose(); > > > String threadName = Thread.currentThread().getName(); > > > if (!threadName.equals("kafka-producer-network-thread")) { > // > > > Some of the callbacks happen on my thread > > > } else { // We are in KafkaProducer's ioThread ==> commit > > > suicide. > > > Thread.currentThread().interrupt(); > > > throw new ThreadDeath(); // Cannot throw an Exception > as > > is > > > will just be caught and logged. > > > } > > > } > > > } > > > > > > My question is, is this the correct approach, or is there some other > way > > to > > > stop sending messages (short of going "sync"ed). > > > > > > Andrew Stein > > > > > > > > > > > -- > > -- Guozhang > > >