Vatsal: I don't think they merged the fix for this bug (retries doesn't work) in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <mev...@sky.optymyze.com> wrote: > Hello, > > Bumping up this thread in case anyone of you have any say on this issue. > > Regards, > Vatsal > > -----Original Message----- > From: Mevada, Vatsal > Sent: 02 December 2016 16:16 > To: Kafka Users <users@kafka.apache.org> > Subject: RE: Detecting when all the retries are expired for a message > > I executed the same producer code for a single record file with following > config: > > properties.put("bootstrap.servers", bootstrapServer); > properties.put("key.serializer", > StringSerializer.class.getCanonicalName()); > properties.put("value.serializer", > StringSerializer.class.getCanonicalName()); > properties.put("acks", "-1"); > properties.put("retries", 50000); > properties.put("request.timeout.ms", 1); > > I have kept request.timeout.ms=1 to make sure that message delivery will > fail with TimeoutException. Since the retries are 50000 then the program > should take at-least 50000 ms (50 seconds) to complete for single record. > However the program is completing almost instantly with only one callback > with TimeoutException. I suspect that producer is not going for any > retries. Or am I missing something in my code? > > My Kafka version is 0.10.0.1. > > Regards, > Vatsal > Am I missing any configuration or > -----Original Message----- > From: Ismael Juma [mailto:isma...@gmail.com] > Sent: 02 December 2016 13:30 > To: Kafka Users <users@kafka.apache.org> > Subject: RE: Detecting when all the retries are expired for a message > > The callback is called after the retries have been exhausted. > > Ismael > > On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <mev...@sky.optymyze.com> wrote: > > > @Ismael: > > > > I can handle TimeoutException in the callback. However as per the > > documentation of Callback(link: https://kafka.apache.org/0100/ > > javadoc/org/apache/kafka/clients/producer/Callback.html), > > TimeoutException is a retriable exception and it says that it "may be > > covered by increasing #.retries". So even if I get TimeoutException in > > callback, wouldn't it try to send message again until all the retries > > are done? Would it be safe to assume that message delivery is failed > > permanently just by encountering TimeoutException in callback? > > > > Here is a snippet from above mentioned documentation: > > "exception - The exception thrown during processing of this record. > > Null if no error occurred. Possible thrown exceptions include: > > Non-Retriable exceptions (fatal, the message will never be sent): > > InvalidTopicException OffsetMetadataTooLargeException > > RecordBatchTooLargeException RecordTooLargeException > > UnknownServerException Retriable exceptions (transient, may be covered > > by increasing #.retries): CorruptRecordException > > InvalidMetadataException NotEnoughReplicasAfterAppendException > > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException > > UnknownTopicOrPartitionException" > > > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not > > face the issue that you are mentioning. I mentioned documentation link > > of 0.9 by mistake. > > > > Regards, > > Vatsal > > -----Original Message----- > > From: Asaf Mesika [mailto:asaf.mes...@gmail.com] > > Sent: 02 December 2016 00:32 > > To: Kafka Users <users@kafka.apache.org> > > Subject: Re: Detecting when all the retries are expired for a message > > > > There's a critical bug in that section that has only been fixed in > > 0.9.0.2 which has not been release yet. Without the fix it doesn't > really retry. > > I forked the kafka repo, applied the fix, built it and placed it in > > our own Nexus Maven repository until 0.9.0.2 will be released. > > > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio > > > > Feel free to use it. > > > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ism...@juma.me.uk> wrote: > > > > > The callback should give you what you are asking for. Has it not > > > worked as you expect when you tried it? > > > > > > Ismael > > > > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal > > > <mev...@sky.optymyze.com> > > > wrote: > > > > > > > Hi, > > > > > > > > > > > > > > > > I am reading a file and dumping each record on Kafka. Here is my > > > > producer > > > > code: > > > > > > > > > > > > > > > > public void produce(String topicName, String filePath, String > > > > bootstrapServers, String encoding) { > > > > > > > > try (BufferedReader bf = > > > > getBufferedReader(filePath, encoding); > > > > > > > > KafkaProducer<Object, String> > > > > producer = > > > > initKafkaProducer(bootstrapServers)) { > > > > > > > > String line; > > > > > > > > while ((line = bf.readLine()) != > > > > null) { > > > > > > > > producer.send(new > > > > ProducerRecord<>(topicName, line), (metadata, e) -> { > > > > > > > > if > > > > (e != > > > > null) { > > > > > > > > > > > > e.printStackTrace(); > > > > > > > > } > > > > > > > > }); > > > > > > > > } > > > > > > > > producer.flush(); > > > > > > > > } catch (IOException e) { > > > > > > > > Throwables.propagate(e); > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > > > > > private static KafkaProducer<Object, String> > > > > initKafkaProducer(String > > > > bootstrapServer) { > > > > > > > > Properties properties = new Properties(); > > > > > > > > properties.put("bootstrap.servers", > > > > bootstrapServer); > > > > > > > > properties.put("key.serializer", > > StringSerializer.class. > > > > getCanonicalName()); > > > > > > > > properties.put("value.serializer", > > > StringSerializer.class. > > > > getCanonicalName()); > > > > > > > > properties.put("acks", "-1"); > > > > > > > > properties.put("retries", 10); > > > > > > > > return new KafkaProducer<>(properties); > > > > > > > > } > > > > > > > > > > > > > > > > private BufferedReader getBufferedReader(String filePath, String > > > encoding) > > > > throws UnsupportedEncodingException, FileNotFoundException { > > > > > > > > return new BufferedReader(new > > > > InputStreamReader(new FileInputStream(filePath), > Optional.ofNullable(encoding). > > > > orElse("UTF-8"))); > > > > > > > > } > > > > > > > > > > > > > > > > As per the official documentation of Callback<https://kafka.apache. > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>, > > > > TimeoutException is a retriable exception. As I have kept retries > > > > 10, producer will try to resend the message if delivering some > > > > message fails with TimeoutException. I am looking for some > > > > reliable to way to detect > > > when > > > > delivery of a message is failed permanently after all retries. > > > > > > > > > > > > > > > > Regards, > > > > > > > > Vatsal > > > > > > > > > >