Hello,

We had tried that... If future.get() is added in the while loop, it takes too 
long for the loop to execute. 

Last time we tried it, it was running for that file for over 2 hours and still 
not finished.

Regards,
Varun

-----Original Message-----
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] 
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future<RecordMetadata>. What happens when you 
add a future.get() on the returned Future, in that while loop, for each sent 
record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> Hello,
>
> We have the following piece of code where we read lines from a file and push 
> them to a Kafka topic :
>
>          Properties properties = new Properties();
>          properties.put("bootstrap.servers", <bootstrapServers>);
>          properties.put("key.serializer", 
> StringSerializer.class.getCanonicalName());
>          properties.put("value.serializer", 
> StringSerializer.class.getCanonicalName());
>          properties.put("retries",100);
>         properties.put("acks", "all");
>
>         KafkaProducer<Object, String> producer =  new 
> KafkaProducer<>(properties);
>
>          try (BufferedReader bf = new BufferedReader(new 
> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>              String line;
>              int count = 0;
>              while ((line = bf.readLine()) != null) {
>                  count++;
>                  producer.send(new ProducerRecord<>(topicName, line));
>              }
>              Logger.log("Done producing data messages. Total no of records 
> produced:" + count);
>          } catch (InterruptedException | ExecutionException | IOException e) {
>              Throwables.propagate(e);
>          } finally {
>              producer.close();
>          }
>
> When we try this with a large file with a million records, only half of them 
> around 500,000 get written to the topic. In the above example, I verified 
> this by running the GetOffset tool after fair amount of time (to ensure all 
> records had finished processing) as follows:
>
>
>          ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
> <broker_list> --time -1 --topic <topic_name>
>
>
>
>
> The output of this was :
>
>
>          topic_name:1:292954
>
>          topic_name:0:296787
>
>
> What could be causing this dropping of records?
>
> Thanks,
> Varun
>

Reply via email to