Re: Kafka producer dropping records

2016-11-25 Thread Ismael Juma
Hi Varun,

You could increase `retries`, but seems like you already configured it to
be `100`. Another option is to increase `retry.backoff.ms` which will
increase the time between retries.

Ismael

On Fri, Nov 25, 2016 at 9:38 AM, Phadnis, Varun <phad...@sky.optymyze.com>
wrote:

> Hello,
>
> Sorry for the late response, we tried logging the errors received in the
> callback and the result is that we are facing TimeoutExceptions
>
> org.apache.kafka.common.errors.TimeoutException: Batch containing
> 93 record(s) expired due to timeout while requesting metadata from brokers
> for mp_test2-1
>
> Increasing the request.timeout.ms=10 (from default of 3) fixed
> the messages from being dropped. However that seems like solution which
> would not scale if there was a unpredictable "burst" of slowness in network
> causing longer delay.
>
> Is there a better way to handle this? Is there any other producer/broker
> configuration I could tweak to increase the reliability of the producer?
>
> Thanks,
> Varun
>
> -Original Message-
> From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael
> Juma
> Sent: 22 November 2016 08:31
> To: Kafka Users <users@kafka.apache.org>
> Subject: Re: Kafka producer dropping records
>
> Another option which is probably easier is to pass a callback to `send`
> and log errors.
>
> Ismael
>
> On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote:
>
> > You can collect the Futures and call `get` in batches. That would give
> > you access to the errors without blocking on each request.
> >
> > Ismael
> >
> >
> > On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun
> > <phad...@sky.optymyze.com>
> > wrote:
> >
> >> Hello,
> >>
> >> We had tried that... If future.get() is added in the while loop, it
> >> takes too long for the loop to execute.
> >>
> >> Last time we tried it, it was running for that file for over 2 hours
> >> and still not finished.
> >>
> >> Regards,
> >> Varun
> >>
> >> -Original Message-
> >> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
> >> Sent: 22 November 2016 02:20
> >> To: users@kafka.apache.org
> >> Subject: Re: Kafka producer dropping records
> >>
> >> The KafkaProducer.send returns a Future. What happens
> >> when you add a future.get() on the returned Future, in that while
> >> loop, for each sent record?
> >>
> >> -Jaikiran
> >>
> >> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> >> > Hello,
> >> >
> >> > We have the following piece of code where we read lines from a file
> >> > and
> >> push them to a Kafka topic :
> >> >
> >> >  Properties properties = new Properties();
> >> >  properties.put("bootstrap.servers", );
> >> >  properties.put("key.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >  properties.put("value.serializer",
> >> StringSerializer.class.getCanonicalName());
> >> >  properties.put("retries",100);
> >> > properties.put("acks", "all");
> >> >
> >> > KafkaProducer<Object, String> producer =  new
> >> > KafkaProducer<>(properties);
> >> >
> >> >  try (BufferedReader bf = new BufferedReader(new
> >> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >> >  String line;
> >> >  int count = 0;
> >> >  while ((line = bf.readLine()) != null) {
> >> >  count++;
> >> >  producer.send(new ProducerRecord<>(topicName, line));
> >> >  }
> >> >  Logger.log("Done producing data messages. Total no of
> >> records produced:" + count);
> >> >  } catch (InterruptedException | ExecutionException |
> >> IOException e) {
> >> >  Throwables.propagate(e);
> >> >  } finally {
> >> >  producer.close();
> >> >  }
> >> >
> >> > When we try this with a large file with a million records, only
> >> > half of
> >> them around 500,000 get written to the topic. In the above example, I
> >> verified this by running the GetOffset tool after fair amount of time
> >> (to ensure all records had finished processing) as follows:
> >> >
> >> >
> >> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell
> >> > --broker-list  --time -1 --topic 
> >> >
> >> >
> >> >
> >> >
> >> > The output of this was :
> >> >
> >> >
> >> >  topic_name:1:292954
> >> >
> >> >  topic_name:0:296787
> >> >
> >> >
> >> > What could be causing this dropping of records?
> >> >
> >> > Thanks,
> >> > Varun
> >> >
> >>
> >>
> >
>


RE: Kafka producer dropping records

2016-11-25 Thread Phadnis, Varun
Hello,

Sorry for the late response, we tried logging the errors received in the 
callback and the result is that we are facing TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Batch containing 93 
record(s) expired due to timeout while requesting metadata from brokers for 
mp_test2-1

Increasing the request.timeout.ms=10 (from default of 3) fixed the 
messages from being dropped. However that seems like solution which would not 
scale if there was a unpredictable "burst" of slowness in network causing 
longer delay. 

Is there a better way to handle this? Is there any other producer/broker 
configuration I could tweak to increase the reliability of the producer? 

Thanks,
Varun

-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: 22 November 2016 08:31
To: Kafka Users <users@kafka.apache.org>
Subject: Re: Kafka producer dropping records

Another option which is probably easier is to pass a callback to `send` and log 
errors.

Ismael

On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> You can collect the Futures and call `get` in batches. That would give 
> you access to the errors without blocking on each request.
>
> Ismael
>
>
> On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun 
> <phad...@sky.optymyze.com>
> wrote:
>
>> Hello,
>>
>> We had tried that... If future.get() is added in the while loop, it 
>> takes too long for the loop to execute.
>>
>> Last time we tried it, it was running for that file for over 2 hours 
>> and still not finished.
>>
>> Regards,
>> Varun
>>
>> -Original Message-
>> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
>> Sent: 22 November 2016 02:20
>> To: users@kafka.apache.org
>> Subject: Re: Kafka producer dropping records
>>
>> The KafkaProducer.send returns a Future. What happens 
>> when you add a future.get() on the returned Future, in that while 
>> loop, for each sent record?
>>
>> -Jaikiran
>>
>> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> > Hello,
>> >
>> > We have the following piece of code where we read lines from a file 
>> > and
>> push them to a Kafka topic :
>> >
>> >  Properties properties = new Properties();
>> >  properties.put("bootstrap.servers", );
>> >  properties.put("key.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("value.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("retries",100);
>> > properties.put("acks", "all");
>> >
>> > KafkaProducer<Object, String> producer =  new 
>> > KafkaProducer<>(properties);
>> >
>> >  try (BufferedReader bf = new BufferedReader(new
>> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>> >  String line;
>> >  int count = 0;
>> >  while ((line = bf.readLine()) != null) {
>> >  count++;
>> >  producer.send(new ProducerRecord<>(topicName, line));
>> >  }
>> >  Logger.log("Done producing data messages. Total no of
>> records produced:" + count);
>> >  } catch (InterruptedException | ExecutionException |
>> IOException e) {
>> >  Throwables.propagate(e);
>> >  } finally {
>> >  producer.close();
>> >  }
>> >
>> > When we try this with a large file with a million records, only 
>> > half of
>> them around 500,000 get written to the topic. In the above example, I 
>> verified this by running the GetOffset tool after fair amount of time 
>> (to ensure all records had finished processing) as follows:
>> >
>> >
>> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell 
>> > --broker-list  --time -1 --topic 
>> >
>> >
>> >
>> >
>> > The output of this was :
>> >
>> >
>> >  topic_name:1:292954
>> >
>> >  topic_name:0:296787
>> >
>> >
>> > What could be causing this dropping of records?
>> >
>> > Thanks,
>> > Varun
>> >
>>
>>
>


Re: Kafka producer dropping records

2016-11-22 Thread Ismael Juma
Another option which is probably easier is to pass a callback to `send` and
log errors.

Ismael

On Tue, Nov 22, 2016 at 10:33 AM, Ismael Juma <ism...@juma.me.uk> wrote:

> You can collect the Futures and call `get` in batches. That would give you
> access to the errors without blocking on each request.
>
> Ismael
>
>
> On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <phad...@sky.optymyze.com>
> wrote:
>
>> Hello,
>>
>> We had tried that... If future.get() is added in the while loop, it takes
>> too long for the loop to execute.
>>
>> Last time we tried it, it was running for that file for over 2 hours and
>> still not finished.
>>
>> Regards,
>> Varun
>>
>> -Original Message-
>> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
>> Sent: 22 November 2016 02:20
>> To: users@kafka.apache.org
>> Subject: Re: Kafka producer dropping records
>>
>> The KafkaProducer.send returns a Future. What happens
>> when you add a future.get() on the returned Future, in that while loop, for
>> each sent record?
>>
>> -Jaikiran
>>
>> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
>> > Hello,
>> >
>> > We have the following piece of code where we read lines from a file and
>> push them to a Kafka topic :
>> >
>> >  Properties properties = new Properties();
>> >  properties.put("bootstrap.servers", );
>> >  properties.put("key.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("value.serializer",
>> StringSerializer.class.getCanonicalName());
>> >  properties.put("retries",100);
>> > properties.put("acks", "all");
>> >
>> > KafkaProducer<Object, String> producer =  new
>> > KafkaProducer<>(properties);
>> >
>> >  try (BufferedReader bf = new BufferedReader(new
>> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>> >  String line;
>> >  int count = 0;
>> >  while ((line = bf.readLine()) != null) {
>> >  count++;
>> >  producer.send(new ProducerRecord<>(topicName, line));
>> >  }
>> >  Logger.log("Done producing data messages. Total no of
>> records produced:" + count);
>> >  } catch (InterruptedException | ExecutionException |
>> IOException e) {
>> >  Throwables.propagate(e);
>> >  } finally {
>> >  producer.close();
>> >  }
>> >
>> > When we try this with a large file with a million records, only half of
>> them around 500,000 get written to the topic. In the above example, I
>> verified this by running the GetOffset tool after fair amount of time (to
>> ensure all records had finished processing) as follows:
>> >
>> >
>> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
>> >  --time -1 --topic 
>> >
>> >
>> >
>> >
>> > The output of this was :
>> >
>> >
>> >  topic_name:1:292954
>> >
>> >  topic_name:0:296787
>> >
>> >
>> > What could be causing this dropping of records?
>> >
>> > Thanks,
>> > Varun
>> >
>>
>>
>


Re: Kafka producer dropping records

2016-11-22 Thread Ismael Juma
You can collect the Futures and call `get` in batches. That would give you
access to the errors without blocking on each request.

Ismael

On Tue, Nov 22, 2016 at 8:56 AM, Phadnis, Varun <phad...@sky.optymyze.com>
wrote:

> Hello,
>
> We had tried that... If future.get() is added in the while loop, it takes
> too long for the loop to execute.
>
> Last time we tried it, it was running for that file for over 2 hours and
> still not finished.
>
> Regards,
> Varun
>
> -Original Message-
> From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
> Sent: 22 November 2016 02:20
> To: users@kafka.apache.org
> Subject: Re: Kafka producer dropping records
>
> The KafkaProducer.send returns a Future. What happens when
> you add a future.get() on the returned Future, in that while loop, for each
> sent record?
>
> -Jaikiran
>
> On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> > Hello,
> >
> > We have the following piece of code where we read lines from a file and
> push them to a Kafka topic :
> >
> >  Properties properties = new Properties();
> >  properties.put("bootstrap.servers", );
> >  properties.put("key.serializer", StringSerializer.class.getCano
> nicalName());
> >  properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
> >  properties.put("retries",100);
> > properties.put("acks", "all");
> >
> > KafkaProducer<Object, String> producer =  new
> > KafkaProducer<>(properties);
> >
> >  try (BufferedReader bf = new BufferedReader(new
> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
> >  String line;
> >  int count = 0;
> >  while ((line = bf.readLine()) != null) {
> >  count++;
> >  producer.send(new ProducerRecord<>(topicName, line));
> >  }
> >  Logger.log("Done producing data messages. Total no of
> records produced:" + count);
> >  } catch (InterruptedException | ExecutionException |
> IOException e) {
> >  Throwables.propagate(e);
> >  } finally {
> >  producer.close();
> >  }
> >
> > When we try this with a large file with a million records, only half of
> them around 500,000 get written to the topic. In the above example, I
> verified this by running the GetOffset tool after fair amount of time (to
> ensure all records had finished processing) as follows:
> >
> >
> >  ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> >  --time -1 --topic 
> >
> >
> >
> >
> > The output of this was :
> >
> >
> >  topic_name:1:292954
> >
> >  topic_name:0:296787
> >
> >
> > What could be causing this dropping of records?
> >
> > Thanks,
> > Varun
> >
>
>


Re: Kafka producer dropping records

2016-11-22 Thread Jaikiran Pai
That tells you that the acknowledgements (which in your case, you have 
set to receive ACKs from all brokers in the ISR) aren't happening and 
that can essentially mean that the records aren't making it to the 
topics. How many brokers do you have? What's the replication factor on 
the topic and what are the ISR brokers for the topic? Ultimately, you 
have to figure out why these ACKs aren't happening.


-Jaikiran

On Tuesday 22 November 2016 02:26 PM, Phadnis, Varun wrote:

Hello,

We had tried that... If future.get() is added in the while loop, it takes too 
long for the loop to execute.

Last time we tried it, it was running for that file for over 2 hours and still 
not finished.

Regards,
Varun

-Original Message-
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com]
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future. What happens when you 
add a future.get() on the returned Future, in that while loop, for each sent record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:

Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

  Properties properties = new Properties();
  properties.put("bootstrap.servers", );
  properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
  properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
  properties.put("retries",100);
 properties.put("acks", "all");

 KafkaProducer<Object, String> producer =  new
KafkaProducer<>(properties);

  try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
  String line;
  int count = 0;
  while ((line = bf.readLine()) != null) {
  count++;
  producer.send(new ProducerRecord<>(topicName, line));
  }
  Logger.log("Done producing data messages. Total no of records 
produced:" + count);
  } catch (InterruptedException | ExecutionException | IOException e) {
  Throwables.propagate(e);
  } finally {
  producer.close();
  }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


  ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
 --time -1 --topic 




The output of this was :


  topic_name:1:292954

  topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun





RE: Kafka producer dropping records

2016-11-22 Thread Phadnis, Varun
Hello,

We had tried that... If future.get() is added in the while loop, it takes too 
long for the loop to execute. 

Last time we tried it, it was running for that file for over 2 hours and still 
not finished.

Regards,
Varun

-Original Message-
From: Jaikiran Pai [mailto:jai.forums2...@gmail.com] 
Sent: 22 November 2016 02:20
To: users@kafka.apache.org
Subject: Re: Kafka producer dropping records

The KafkaProducer.send returns a Future. What happens when you 
add a future.get() on the returned Future, in that while loop, for each sent 
record?

-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:
> Hello,
>
> We have the following piece of code where we read lines from a file and push 
> them to a Kafka topic :
>
>  Properties properties = new Properties();
>  properties.put("bootstrap.servers", );
>  properties.put("key.serializer", 
> StringSerializer.class.getCanonicalName());
>  properties.put("value.serializer", 
> StringSerializer.class.getCanonicalName());
>  properties.put("retries",100);
> properties.put("acks", "all");
>
> KafkaProducer<Object, String> producer =  new 
> KafkaProducer<>(properties);
>
>  try (BufferedReader bf = new BufferedReader(new 
> InputStreamReader(new FileInputStream(filePath), "UTF-8"))) {
>  String line;
>  int count = 0;
>  while ((line = bf.readLine()) != null) {
>  count++;
>  producer.send(new ProducerRecord<>(topicName, line));
>  }
>  Logger.log("Done producing data messages. Total no of records 
> produced:" + count);
>  } catch (InterruptedException | ExecutionException | IOException e) {
>  Throwables.propagate(e);
>  } finally {
>  producer.close();
>  }
>
> When we try this with a large file with a million records, only half of them 
> around 500,000 get written to the topic. In the above example, I verified 
> this by running the GetOffset tool after fair amount of time (to ensure all 
> records had finished processing) as follows:
>
>
>  ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
>  --time -1 --topic 
>
>
>
>
> The output of this was :
>
>
>  topic_name:1:292954
>
>  topic_name:0:296787
>
>
> What could be causing this dropping of records?
>
> Thanks,
> Varun
>



Re: Kafka producer dropping records

2016-11-22 Thread Jaikiran Pai
The KafkaProducer.send returns a Future. What happens 
when you add a future.get() on the returned Future, in that while loop, 
for each sent record?


-Jaikiran

On Tuesday 22 November 2016 12:45 PM, Phadnis, Varun wrote:

Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

 Properties properties = new Properties();
 properties.put("bootstrap.servers", );
 properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
 properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
 properties.put("retries",100);
properties.put("acks", "all");

KafkaProducer producer =  new 
KafkaProducer<>(properties);

 try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
 String line;
 int count = 0;
 while ((line = bf.readLine()) != null) {
 count++;
 producer.send(new ProducerRecord<>(topicName, line));
 }
 Logger.log("Done producing data messages. Total no of records 
produced:" + count);
 } catch (InterruptedException | ExecutionException | IOException e) {
 Throwables.propagate(e);
 } finally {
 producer.close();
 }

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list  
--time -1 --topic 




The output of this was :


 topic_name:1:292954

 topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun





Kafka producer dropping records

2016-11-21 Thread Phadnis, Varun
Hello,

We have the following piece of code where we read lines from a file and push 
them to a Kafka topic :

Properties properties = new Properties();
properties.put("bootstrap.servers", );
properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());
properties.put("retries",100);
   properties.put("acks", "all");

   KafkaProducer producer =  new 
KafkaProducer<>(properties);

try (BufferedReader bf = new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), "UTF-8"))) {
String line;
int count = 0;
while ((line = bf.readLine()) != null) {
count++;
producer.send(new ProducerRecord<>(topicName, line));
}
Logger.log("Done producing data messages. Total no of records 
produced:" + count);
} catch (InterruptedException | ExecutionException | IOException e) {
Throwables.propagate(e);
} finally {
producer.close();
}

When we try this with a large file with a million records, only half of them 
around 500,000 get written to the topic. In the above example, I verified this 
by running the GetOffset tool after fair amount of time (to ensure all records 
had finished processing) as follows:


./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 
 --time -1 --topic 




The output of this was :


topic_name:1:292954

topic_name:0:296787


What could be causing this dropping of records?

Thanks,
Varun