Hi,
I'm writing a little test to check Kafka high availability, with 2 brokers,
1 topic with replication factor = 2 and min.insync.replicas=2.

This is the test:

         System.out.println("Building KafkaProducer...");

         KafkaProducer<byte[],byte[]> m_kafkaProducer = new
KafkaProducer<byte[],byte[]>(propsProducer);

         System.out.println("Building ProducerRecord...");

         ProducerRecord<byte[], byte[]> prMessage = new
ProducerRecord<byte[],byte[]>(strTopic, jsonInString.getBytes());

         long now = System.currentTimeMillis();

         ....

         for (int i=0; i<3; i++)
                 {
                        try {
                              for(int x=1; x<= numMessages; x++)
                                    m_kafkaProducer.send(prMessage);

                                 System.out.println("Wait for 60 seconds");

                              Thread.sleep(60000);

                         } catch(Exception e)
                         {

                             System.out.println("Error sending message : "
+ e.getMessage());
                         }

                 }

          . . .

When test is running, after first step of "for cicle", I kill one broker,
so only one broker remains alive.
When the test execute second and third cicle, no errors are caught by
kafka-producer; I see only error on kafka broker logs. The test terminate
successfully, with messages not really sent.

In this way my application that use "async" producer is not able to catch
invalid state of kafka brokers.

Is there a way to catch this kind of errors on kafka producers ?

Thanks.

Reply via email to