Andrew

If you see BaseProducer.scala, there's the following code snippet

override def send(topic: String, key: Array[Byte], value: Array[Byte]) {
    val record = new ProducerRecord(topic, key, value)
    if(sync) {
      this.producer.send(record).get()
    } else {
      this.producer.send(record,
        new ErrorLoggingCallback(topic, key, value, false))
    }
  }

NewShinyProducer is used in ProducerPerformance.scala. When I tested both
sync mode and async mode, I didn't see much difference. So, I guess you can
safely use sync.

Also, there's ProducerPerformance.java in the new version of producer.

           Callback cb = stats.nextCompletion(sendStart, payload.length,
stats);
            producer.send(record, cb);

        public Callback nextCompletion(long start, int bytes, Stats stats) {
            Callback cb = new PerfCallback(this.iteration, start, bytes,
stats);
            this.iteration++;
            return cb;
        }

private static final class PerfCallback implements Callback {
        private final long start;
        private final int iteration;
        private final int bytes;
        private final Stats stats;

        public PerfCallback(int iter, long start, int bytes, Stats stats) {
            this.start = start;
            this.stats = stats;
            this.iteration = iter;
            this.bytes = bytes;
        }

        public void onCompletion(RecordMetadata metadata, Exception
exception) {
            long now = System.currentTimeMillis();
            int latency = (int) (now - start);
            this.stats.record(iteration, latency, bytes, now);
            if (exception != null)
                exception.printStackTrace();
        }
    }

I hope this will be helpful.

On Fri, Sep 19, 2014 at 1:47 PM, Andrew Stein <
andrew.st...@quantumretail.com> wrote:

> Hi Guozhang,
>
> I specifically to not want to do a get() on every future. This is
> equivalent to going "sync"ed, which is a performance killer for us.
>
> What I am looking for is some way to tell a producer to forget about queued
> messages. I have done some more testing on my solution and it needs some
> work. That said, the reliance on a thread named
> "kafka-producer-network-thread" is something I don't like -- too much
> knowledge of the KafkaProducer's internals.
>
> Hoping for other suggestions...
>
>
> Andrew Stein
>
>
> On Fri, Sep 19, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Andrew,
> >
> > I think you would want a sync producer for your use case? You can try to
> > call get() on the returned metadata future of the send() call instead of
> > using a callback; the pattern is something like:
> >
> > for (message in messages)
> >     producer.send(message).get()
> >
> > The get() call will block until the message response has received, and
> will
> > throw an exception if the response is "failure", you the then catch the
> > exception and pause the producer.
> >
> > Guozhang
> >
> > On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein <
> > andrew.st...@quantumretail.com> wrote:
> >
> > > I am trying to understand the best practices for working with the new
> > > (0.8.2) Producer interface.
> > >
> > > We have a process in a large server that writes a lot of data to Kafka.
> > > However, this data is not mission critical. When a problem arises
> writing
> > > to Kafka, most specifically network issues, but also full Producer
> > buffers,
> > > we want the server to continue working, but to stop sending data to
> > Kafka,
> > > allowing other tasks to continue. The issue I have is handling messages
> > > that have been "sent" to the producer but are waiting to go to Kafka.
> > These
> > > messages remain long after my processing is over, timing out, writing
> to
> > > the logs, and
> > > preventing me from moving forward. I am looking for some way to tell
> the
> > > client to stop forwarding messages to Kafka.
> > >
> > > This is what I have so far:
> > >
> > >     class ErrorCallback implements Callback {
> > >         @Override
> > >         public void onCompletion(RecordMetadata metadata, Exception
> > > exception) {
> > >             if (exception == null) { // The message was sent,
> > >                 return;
> > >             }
> > >
> > >             stopProducerSendAndClose();
> > >             String threadName = Thread.currentThread().getName();
> > >             if (!threadName.equals("kafka-producer-network-thread")) {
> //
> > > Some of the callbacks happen on my thread
> > >             } else { // We are in KafkaProducer's ioThread ==> commit
> > > suicide.
> > >                 Thread.currentThread().interrupt();
> > >                 throw new ThreadDeath(); // Cannot throw an Exception
> as
> > is
> > > will just be caught and logged.
> > >             }
> > >         }
> > >     }
> > >
> > > My question is, is this the correct approach, or is there some other
> way
> > to
> > > stop sending messages (short of going "sync"ed).
> > >
> > > Andrew Stein
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to