That tells you that the acknowledgements (which in your case, you have set to receive ACKs from all brokers in the ISR) aren't happening and that can essentially mean that the records aren't making it to the topics. How many brokers do you have? What's the replication factor on the topic and what are the ISR brokers for the topic? Ultimately, you have to figure out why these ACKs aren't happening.

-Jaikiran

On Tuesday 22 November 2016 02:26 PM, Phadnis, Varun wrote:
Hello,

We had tried that... If future.get() is added in the while loop, it takes too 
long for the loop to execute.

Last time we tried it, it was running for that file for over 2 hours and still 
not finished.

Regards,
Varun

-----Original Message-----
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you 
add a future.get() on the returned Future, in that while loop, for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

          Properties properties = new Properties();
          properties.put("bootstrap.servers", <bootstrapServers>);
          properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
          properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
          properties.put("retries",100);
         properties.put("acks", "all");

         KafkaProducer<Object, String> producer =  new
KafkaProducer<>(properties);

          try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
              String line;
              int count = 0;
              while ((line = bf.readLine()) != null) {
                  count++;
                  producer.send(new ProducerRecord<>(topicName, line));
              }
              Logger.log("Done producing data messages. Total no of records 
produced:" + count);
          } catch (InterruptedException | ExecutionException | IOException e) {
              Throwables.propagate(e);
          } finally {
              producer.close();
          }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
<broker_list> --time -1 --topic <topic_name>




The output of this was :


          topic_name:1:292954

          topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun


Reply via email to