Lost message in asynchronous kafka producer

2016-09-16 Thread Agostino Calamita
Hi,
I have 2 brokers with a topic with replication factor = 2.
Brokers are configured with min.insync.replicas=2.

I use a producer in asynchronous mode to send 10 messages.

After some seconds after producer start, I stop one broker.

On producer side I got no exceptions, so for producer application is all
OK, but in broker log message I see
 ERROR [Replica Manager on Broker 0]: Error processing append operation on
partition diameternew-10 (kafka.server.ReplicaManager)

So I lost all messages after broker shutdown.

Is there a way to catch as soon as possible this kind of exeption ?

I know that I can use

 m_kafkaProducer.send(prMessage).get();

or

Future _future = m_kafkaProducer.send(prMessage);

or callback in send method, but they are very slow.


Thank.


No error to kafka-producer on broker shutdown

2016-09-09 Thread Agostino Calamita
Hi,
I'm writing a little test to check Kafka high availability, with 2 brokers,
1 topic with replication factor = 2 and min.insync.replicas=2.

This is the test:

 System.out.println("Building KafkaProducer...");

 KafkaProducer m_kafkaProducer = new
KafkaProducer(propsProducer);

 System.out.println("Building ProducerRecord...");

 ProducerRecord prMessage = new
ProducerRecord(strTopic, jsonInString.getBytes());

 long now = System.currentTimeMillis();

 

 for (int i=0; i<3; i++)
 {
try {
  for(int x=1; x<= numMessages; x++)
m_kafkaProducer.send(prMessage);

 System.out.println("Wait for 60 seconds");

  Thread.sleep(6);

 } catch(Exception e)
 {

 System.out.println("Error sending message : "
+ e.getMessage());
 }

 }

  . . .

When test is running, after first step of "for cicle", I kill one broker,
so only one broker remains alive.
When the test execute second and third cicle, no errors are caught by
kafka-producer; I see only error on kafka broker logs. The test terminate
successfully, with messages not really sent.

In this way my application that use "async" producer is not able to catch
invalid state of kafka brokers.

Is there a way to catch this kind of errors on kafka producers ?

Thanks.