I am trying to understand the best practices for working with the new
(0.8.2) Producer interface.

We have a process in a large server that writes a lot of data to Kafka.
However, this data is not mission critical. When a problem arises writing
to Kafka, most specifically network issues, but also full Producer buffers,
we want the server to continue working, but to stop sending data to Kafka,
allowing other tasks to continue. The issue I have is handling messages
that have been "sent" to the producer but are waiting to go to Kafka. These
messages remain long after my processing is over, timing out, writing to
the logs, and
preventing me from moving forward. I am looking for some way to tell the
client to stop forwarding messages to Kafka.

This is what I have so far:

    class ErrorCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception
exception) {
            if (exception == null) { // The message was sent,
                return;
            }

            stopProducerSendAndClose();
            String threadName = Thread.currentThread().getName();
            if (!threadName.equals("kafka-producer-network-thread")) { //
Some of the callbacks happen on my thread
            } else { // We are in KafkaProducer's ioThread ==> commit
suicide.
                Thread.currentThread().interrupt();
                throw new ThreadDeath(); // Cannot throw an Exception as is
will just be caught and logged.
            }
        }
    }

My question is, is this the correct approach, or is there some other way to
stop sending messages (short of going "sync"ed).

Andrew Stein

Reply via email to