Hi,
I am reading a file and dumping each record on Kafka. Here is my producer code:
public void produce(String topicName, String filePath, String bootstrapServers,
String encoding) {
try (BufferedReader bf = getBufferedReader(filePath, encoding);
KafkaProducer<Object, String> producer =
initKafkaProducer(bootstrapServers)) {
String line;
while ((line = bf.readLine()) != null) {
producer.send(new
ProducerRecord<>(topicName, line), (metadata, e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
producer.flush();
} catch (IOException e) {
Throwables.propagate(e);
}
}
private static KafkaProducer<Object, String> initKafkaProducer(String
bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer",
StringSerializer.class.getCanonicalName());
properties.put("value.serializer",
StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 10);
return new KafkaProducer<>(properties);
}
private BufferedReader getBufferedReader(String filePath, String encoding)
throws UnsupportedEncodingException, FileNotFoundException {
return new BufferedReader(new InputStreamReader(new
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}
As per the official documentation of
Callback<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
TimeoutException is a retriable exception. As I have kept retries 10, producer
will try to resend the message if delivering some message fails with
TimeoutException. I am looking for some reliable to way to detect when delivery
of a message is failed permanently after all retries.
Regards,
Vatsal