@Asaf
Do I need to raise new bug for this?
@Rajini
Please suggest some the configuration with which retries should work according
to you. The code is already there in the mail chain. I am adding it here again:
public void produce(String topicName, String filePath, String bootstrapServers,
String encoding) {
try (BufferedReader bf = getBufferedReader(filePath, encoding);
KafkaProducer<Object, String> producer =
initKafkaProducer(bootstrapServers)) {
String line;
while ((line = bf.readLine()) != null) {
producer.send(new
ProducerRecord<>(topicName, line), (metadata, e) -> {
if (e != null) {
e.printStackTrace();
}
});
}
producer.flush();
} catch (IOException e) {
Throwables.propagate(e);
}
}
private static KafkaProducer<Object, String> initKafkaProducer(String
bootstrapServer) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServer);
properties.put("key.serializer",
StringSerializer.class.getCanonicalName());
properties.put("value.serializer",StringSerializer.class.getCanonicalName());
properties.put("acks", "-1");
properties.put("retries", 50000);
properties.put("request.timeout.ms", 1);
return new KafkaProducer<>(properties);
}
private BufferedReader getBufferedReader(String filePath, String encoding)
throws UnsupportedEncodingException, FileNotFoundException {
return new BufferedReader(new InputStreamReader(new
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));
}
Regards,
Vatsal
-----Original Message-----
From: Rajini Sivaram [mailto:[email protected]]
Sent: 06 December 2016 17:27
To: [email protected]
Subject: Re: Detecting when all the retries are expired for a message
I believe batches in RecordAccumulator are expired after request.timeout.ms, so
they wouldn't get retried in this case. I think the config options are quite
confusing, making it hard to figure out the behavior without looking into the
code.
On Tue, Dec 6, 2016 at 10:10 AM, Asaf Mesika
<[email protected]<mailto:[email protected]>> wrote:
> Vatsal:
>
> I don't think they merged the fix for this bug (retries doesn't work)
> in 0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547
>
>
> On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal
> <[email protected]<mailto:[email protected]>>
> wrote:
>
> > Hello,
> >
> > Bumping up this thread in case anyone of you have any say on this issue.
> >
> > Regards,
> > Vatsal
> >
> > -----Original Message-----
> > From: Mevada, Vatsal
> > Sent: 02 December 2016 16:16
> > To: Kafka Users <[email protected]<mailto:[email protected]>>
> > Subject: RE: Detecting when all the retries are expired for a
> > message
> >
> > I executed the same producer code for a single record file with
> > following
> > config:
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> > properties.put("key.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("value.serializer",
> > StringSerializer.class.getCanonicalName());
> > properties.put("acks", "-1");
> > properties.put("retries", 50000);
> > properties.put("request.timeout.ms", 1);
> >
> > I have kept request.timeout.ms=1 to make sure that message delivery
> > will fail with TimeoutException. Since the retries are 50000 then
> > the program should take at-least 50000 ms (50 seconds) to complete for
> > single record.
> > However the program is completing almost instantly with only one
> > callback with TimeoutException. I suspect that producer is not going
> > for any retries. Or am I missing something in my code?
> >
> > My Kafka version is 0.10.0.1.
> >
> > Regards,
> > Vatsal
> > Am I missing any configuration or
> > -----Original Message-----
> > From: Ismael Juma [mailto:[email protected]]
> > Sent: 02 December 2016 13:30
> > To: Kafka Users <[email protected]<mailto:[email protected]>>
> > Subject: RE: Detecting when all the retries are expired for a
> > message
> >
> > The callback is called after the retries have been exhausted.
> >
> > Ismael
> >
> > On 2 Dec 2016 3:34 am, "Mevada, Vatsal"
> > <[email protected]<mailto:[email protected]>> wrote:
> >
> > > @Ismael:
> > >
> > > I can handle TimeoutException in the callback. However as per the
> > > documentation of Callback(link: https://kafka.apache.org/0100/
> > > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > > TimeoutException is a retriable exception and it says that it "may
> > > be covered by increasing #.retries". So even if I get
> > > TimeoutException in callback, wouldn't it try to send message
> > > again until all the retries are done? Would it be safe to assume
> > > that message delivery is failed permanently just by encountering
> > > TimeoutException in callback?
> > >
> > > Here is a snippet from above mentioned documentation:
> > > "exception - The exception thrown during processing of this record.
> > > Null if no error occurred. Possible thrown exceptions include:
> > > Non-Retriable exceptions (fatal, the message will never be sent):
> > > InvalidTopicException OffsetMetadataTooLargeException
> > > RecordBatchTooLargeException RecordTooLargeException
> > > UnknownServerException Retriable exceptions (transient, may be
> > > covered by increasing #.retries): CorruptRecordException
> > > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > > NotEnoughReplicasException OffsetOutOfRangeException
> > > TimeoutException UnknownTopicOrPartitionException"
> > >
> > > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > > face the issue that you are mentioning. I mentioned documentation
> > > link of 0.9 by mistake.
> > >
> > > Regards,
> > > Vatsal
> > > -----Original Message-----
> > > From: Asaf Mesika [mailto:[email protected]]
> > > Sent: 02 December 2016 00:32
> > > To: Kafka Users <[email protected]<mailto:[email protected]>>
> > > Subject: Re: Detecting when all the retries are expired for a
> > > message
> > >
> > > There's a critical bug in that section that has only been fixed in
> > > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> > really retry.
> > > I forked the kafka repo, applied the fix, built it and placed it
> > > in our own Nexus Maven repository until 0.9.0.2 will be released.
> > >
> > > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> > >
> > > Feel free to use it.
> > >
> > > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma
> > > <[email protected]<mailto:[email protected]>> wrote:
> > >
> > > > The callback should give you what you are asking for. Has it not
> > > > worked as you expect when you tried it?
> > > >
> > > > Ismael
> > > >
> > > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > > <[email protected]<mailto:[email protected]>>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > >
> > > > >
> > > > > I am reading a file and dumping each record on Kafka. Here is
> > > > > my producer
> > > > > code:
> > > > >
> > > > >
> > > > >
> > > > > public void produce(String topicName, String filePath, String
> > > > > bootstrapServers, String encoding) {
> > > > >
> > > > > try (BufferedReader bf =
> > > > > getBufferedReader(filePath, encoding);
> > > > >
> > > > > KafkaProducer<Object, String>
> > > > > producer =
> > > > > initKafkaProducer(bootstrapServers)) {
> > > > >
> > > > > String line;
> > > > >
> > > > > while ((line = bf.readLine())
> > > > > !=
> > > > > null) {
> > > > >
> > > > >
> > > > > producer.send(new ProducerRecord<>(topicName, line),
> > > > > (metadata, e) -> {
> > > > >
> > > > >
> > > > > if (e !=
> > > > > null) {
> > > > >
> > > > >
> > > > > e.printStackTrace();
> > > > >
> > > > >
> > > > > }
> > > > >
> > > > > });
> > > > >
> > > > > }
> > > > >
> > > > > producer.flush();
> > > > >
> > > > > } catch (IOException e) {
> > > > >
> > > > > Throwables.propagate(e);
> > > > >
> > > > > }
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private static KafkaProducer<Object, String>
> > > > > initKafkaProducer(String
> > > > > bootstrapServer) {
> > > > >
> > > > > Properties properties = new Properties();
> > > > >
> > > > > properties.put("bootstrap.servers",
> > > > > bootstrapServer);
> > > > >
> > > > > properties.put("key.serializer",
> > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > > properties.put("value.serializer",
> > > > StringSerializer.class.
> > > > > getCanonicalName());
> > > > >
> > > > > properties.put("acks", "-1");
> > > > >
> > > > > properties.put("retries", 10);
> > > > >
> > > > > return new KafkaProducer<>(properties);
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > private BufferedReader getBufferedReader(String filePath,
> > > > > String
> > > > encoding)
> > > > > throws UnsupportedEncodingException, FileNotFoundException {
> > > > >
> > > > > return new BufferedReader(new
> > > > > InputStreamReader(new FileInputStream(filePath),
> > Optional.ofNullable(encoding).
> > > > > orElse("UTF-8")));
> > > > >
> > > > > }
> > > > >
> > > > >
> > > > >
> > > > > As per the official documentation of
> > > > > Callback<https://kafka.apache
> .
> > > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.htm
> > > > > l>, TimeoutException is a retriable exception. As I have kept
> > > > > retries 10, producer will try to resend the message if
> > > > > delivering some message fails with TimeoutException. I am
> > > > > looking for some reliable to way to detect
> > > > when
> > > > > delivery of a message is failed permanently after all retries.
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Vatsal
> > > > >
> > > >
> > >
> >
>
--
Regards,
Rajini