Producer is very slow in Kafka .8

2014-12-21 Thread Saurabh Minni
Hi,
I was trying out Kafka .8 and was using PHP kafka(
https://github.com/salebab/phpkafka )  extension to produce in PHP

Unfortunately it does not produce faster than a 3 messages per second
reliably.

Was wondering if anyone can help me speed this up.

Thanks,
Saurabh


Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-21 Thread Paul Pearcy
FYI, I bumped server to 0.8.2-beta and I don't hit the basic failure I
mentioned above, which is great.

I haven't been able to find confirmation in the docs, but from past
conversation(
http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3c20140829174552.ga30...@jkoshy-ld.linkedin.biz%3E),
it seems that 0.8.2 producer should be fully compatible with 0.8.1.1
broker.

>From everything I see running in a single node config, the 0.8.2 Java
producer is effectively dead after a complete disconnect to the 0.8.1.1
broker.

Thanks,
Paul

On Sun, Dec 21, 2014 at 3:06 AM, Paul Pearcy  wrote:

> Sounds good.
>
> Yes, I'd want a guarantee that every future I get will always return the
> recordmeta or an exception eventually.
>
> Running into a similar issue with futures never returning with a pretty
> straightforward case:
> - Healthy producer/server setup
> - Stop the server
> - Send a message
> - Call get on the future and it never returns. Doesn't matter if the
> server is started again or remains stopped
>
> Stepping into the producer code, it appears that in Sender.run a response
> never comes back for the send while things are down and handleDisconnect is
> never called.
>
> These are basically the same tests I am running against 0.8.1.1 producer,
> but I could have some wires crossed, so would be curious if others see
> similar.
>
> Thanks,
> Paul
>
>
> On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps  wrote:
>
>> Yeah if you want to file and JIRA and post a patch for a new option its
>> possible others would want it. Maybe something like
>>   pre.initialize.topics=x,y,z
>>   pre.initialize.timeout=x
>>
>> The metadata fetch timeout is a bug...that behavior is inherited from
>> Object.wait which defines zero to mean infinite but I think that is not
>> very intuitive. If you file a ticket on that we could just fix it. I think
>> being able to set 0 is actually useful for this case you are trying for.
>>
>> WRT to stopping the producer I think what you are saying is that you want
>> it to be the case that calling close() on the producer immediately fails
>> all outstanding requests with some exception, right?
>>
>> -Jay
>>
>> On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy  wrote:
>> >
>> > Hi Jay,
>> >   I have implemented a wrapper around the producer to behave like I
>> want it
>> > to. Where it diverges from current 0.8.2 producer is that it accepts
>> three
>> > new inputs:
>> > - A list of expected topics
>> > - A timeout value to init meta for those topics during producer
>> creationg
>> > - An option to blow up if we fail to init topic meta within some amount
>> of
>> > time
>> >
>> > I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will
>> > block
>> > forever and kick off a thread to do the topic meta data init in the
>> > background.
>> >
>> > On the send side, things do fail fast, now. Only current hiccup(not
>> > completely done re-working my tests, though) I am hitting now is that
>> > messages accepted by the producer after the server have been stopped
>> never
>> > return a status if the producer is stopped, think this is a bug.
>> >
>> > Are you sure you wouldn't want any of this behavior in client by default
>> > which would give out of the box choices to be made on blocking behavior?
>> > Happy to share code or send a PR.
>> >
>> > Thanks,
>> > Paul
>> >
>> > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps  wrote:
>> >
>> > > Hey Paul,
>> > >
>> > > I agree we should document this better.
>> > >
>> > > We allow and encourage using partitions to semantically distribute
>> data.
>> > So
>> > > unfortunately we can't just arbitrarily assign a partition (say 0) as
>> > that
>> > > would actually give incorrect answers for any consumer that made use
>> of
>> > the
>> > > partitioning. It is true that the user can change the partitioning,
>> but
>> > we
>> > > can't ignore the partitioning they have set.
>> > >
>> > > I get the use case you have--you basically want a hard guarantee that
>> > > send() will never block (so presumably you have set to also drop data
>> if
>> > > the buffer fills up). As I said the blocking only occurs on the first
>> > > request for a given topic and you can avoid it by pre-initializing the
>> > > topic metadata.
>> > >
>> > > I think the option you describe is actually possible now. Basically
>> you
>> > can
>> > > initialize the metadata for topics you care about using that
>> > > partitionsFor() call. If you set the property
>> metadata.fetch.timeout.ms
>> > =0
>> > > then any send calls prior to the completion of metadata initialization
>> > will
>> > > fail immediately rather than block.
>> > >
>> > > -Jay
>> > >
>> > >
>> > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy 
>> wrote:
>> > > >
>> > > > Hi Jay,
>> > > >   Many thanks for the info. All that makes sense, but from an API
>> > > > standpoint when something is labelled async and returns a Future,
>> this
>> > > will
>> > > > be misconstrued and developers will place async sends in critical
>>

Re: Produce 1 million events/seconds

2014-12-21 Thread Pramod Deshmukh
*Kafka: *Apache Kafka 0.8.1.1


*SImplePartitioner.java*
public int partition(Object key, int a_numPartitions) {
int partition = Integer.parseInt((String)key);
LOG.debug("SimplePartitioner Partion: " + partition);
return partition;
}



On Sun, Dec 21, 2014 at 10:54 PM, Pramod Deshmukh 
wrote:

> I have a requirement to prove kafka producer can produce 1 million
> events/second to Kafka cluster.
>
> So far, best I could achieve is 200k events/sec on topic with 2
> partitions. The latency increases with adding more partitions so I want to
> test with 2 partitions for now.
>
> Below are the details along with produce code (java). How can I achieve
> produce 1million event/sec.? I went thru kafka benchmarking blog as well.
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs),
> 64GB RAM.
> *Broker:* Allocated 6GB, 16 io.threads, 8 network threads.
> *Topic: 2* partition, replication factor of 1 (Get high latency)
> *Zookeepers: *3 zk instances running individually on master nodes (not
> co-located with kafka broker/servers)
>
>
> *Producer Code:*
> public class TestProducer {
>
> private static String msg = "TEST KAFKA PERFORMANCE";
> private static Logger LOG = Logger.getLogger(TestProducer.class);
>
> public static void main(String... args){
> System.out.println("START - Test Producer");
>
> long messageCount = Long.parseLong(args[0]);
> long messageCountForStat = Long.parseLong(args[0]);
> String topic = args[1];
> String brokerList = args[2];
> int batchCount = Integer.parseInt(args[3]);
> int topicPartions = Integer.parseInt(args[4]);
> Producer producer = getProducer(brokerList,
> batchCount);
> Date startTime = new Date(System.currentTimeMillis());
> Random rnd = new Random();
> String partition = "";
> //Produce messages.
> while (messageCount != 0) {
> partition = ""+(int)messageCount%topicPartions;
> KeyedMessage message =
> new KeyedMessage(topic, partition,
> msg);
> producer.send(message);
> messageCount--;
> }
>
> Date endTime = new Date(System.currentTimeMillis());
> System.out.println("#");
> System.out.println("MESSAGES SENT: " + messageCountForStat);
> System.out.println("START TIME: " + startTime);
> System.out.println("END TIME: " + endTime);
> System.out.println("#");
> System.out.println("END - Test Producer");
> }
>
> public static Producer getProducer(String brokerList,
> int batchSize) {
>
> props.put("metadata.broker.list", brokerList);
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("partitioner.class", "com.my.SimplePartitioner");
> props.put("request.required.acks", "0");
> props.put("producer.type", "async");
> props.put("compression.codec", "snappy");
> props.put("batch.num.messages", Integer.toString(batchSize));
>
> ProducerConfig config = new ProducerConfig(props);
>
> Producer producer = new Producer String>(config);
> return producer;
> }
>
> }
>


Produce 1 million events/seconds

2014-12-21 Thread Pramod Deshmukh
I have a requirement to prove kafka producer can produce 1 million
events/second to Kafka cluster.

So far, best I could achieve is 200k events/sec on topic with 2 partitions.
The latency increases with adding more partitions so I want to test with 2
partitions for now.

Below are the details along with produce code (java). How can I achieve
produce 1million event/sec.? I went thru kafka benchmarking blog as well.
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

*Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs),
64GB RAM.
*Broker:* Allocated 6GB, 16 io.threads, 8 network threads.
*Topic: 2* partition, replication factor of 1 (Get high latency)
*Zookeepers: *3 zk instances running individually on master nodes (not
co-located with kafka broker/servers)


*Producer Code:*
public class TestProducer {

private static String msg = "TEST KAFKA PERFORMANCE";
private static Logger LOG = Logger.getLogger(TestProducer.class);

public static void main(String... args){
System.out.println("START - Test Producer");

long messageCount = Long.parseLong(args[0]);
long messageCountForStat = Long.parseLong(args[0]);
String topic = args[1];
String brokerList = args[2];
int batchCount = Integer.parseInt(args[3]);
int topicPartions = Integer.parseInt(args[4]);
Producer producer = getProducer(brokerList,
batchCount);
Date startTime = new Date(System.currentTimeMillis());
Random rnd = new Random();
String partition = "";
//Produce messages.
while (messageCount != 0) {
partition = ""+(int)messageCount%topicPartions;
KeyedMessage message =
new KeyedMessage(topic, partition, msg);
producer.send(message);
messageCount--;
}

Date endTime = new Date(System.currentTimeMillis());
System.out.println("#");
System.out.println("MESSAGES SENT: " + messageCountForStat);
System.out.println("START TIME: " + startTime);
System.out.println("END TIME: " + endTime);
System.out.println("#");
System.out.println("END - Test Producer");
}

public static Producer getProducer(String brokerList,
int batchSize) {

props.put("metadata.broker.list", brokerList);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.my.SimplePartitioner");
props.put("request.required.acks", "0");
props.put("producer.type", "async");
props.put("compression.codec", "snappy");
props.put("batch.num.messages", Integer.toString(batchSize));

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);
return producer;
}

}


RE: Trying to figure out kafka latency issues

2014-12-21 Thread Thunder Stumpges
Ah I thought it was restarting the broker that made things better :)

Yeah I have no experience with the Java client so can't really help there.

Good luck!

-Original Message-
From: Rajiv Kurian [ra...@signalfuse.com]
Received: Sunday, 21 Dec 2014, 12:25PM
To: users@kafka.apache.org [users@kafka.apache.org]
Subject: Re: Trying to figure out kafka latency issues

I'll take a look at the GC profile of the brokers Right now I keep a tab on
the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not
JVM heap) free disk space on the broker. I'll need to take a look at the
JVM metrics too. What seemed strange is that going from 8 -> 512 partitions
increases the latency, but going fro 512-> 8 does not decrease it. I have
to restart the producer (but not the broker) for the end to end latency to
go down That made it seem  that the fault was probably with the producer
and not the broker. Only restarting the producer made things better. I'll
do more extensive measurement on the broker.

On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges 
wrote:
>
> Did you see my response and have you checked the server logs especially
> the GC logs? It still sounds like you are running out of memory on the
> broker. What is your max heap memory and are you thrashing once you start
> writing to all those partitions?
>
> You have measured very thoroughly from an external point of view, i think
> now you'll have to start measuring the internal metrics. Maybe someone else
> will have ideas on what jmx values to watch.
>
> Best,
> Thunder
>
>
> -Original Message-
> From: Rajiv Kurian [ra...@signalfuse.com]
> Received: Saturday, 20 Dec 2014, 10:24PM
> To: users@kafka.apache.org [users@kafka.apache.org]
> Subject: Re: Trying to figure out kafka latency issues
>
> Some more work tells me that the end to end latency numbers vary with the
> number of partitions I am writing to. I did an experiment, where based on a
> run time flag I would dynamically select how many of the *1024 partitions*
> I write to. So say I decide I'll write to at most 256 partitions I mod
> whatever partition I would actually write to by 256. Basically the number
> of partitions for this topic on the broker remains the same at *1024*
> partitions but the number of partitions my producers write to changes
> dynamically based on a run time flag. So something like this:
>
> int partition = getPartitionForMessage(message);
> int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag can be
> updated without bringing the application down - just a volatile read of
> some number set externally.
> int moddedPartition = partition % maxPartitionsToWrite.
> // Send a message to this Kafka partition.
>
> Here are some interesting things I've noticed:
>
> i) When I start my client and it *never writes* to more than *8
> partitions *(same
> data rate but fewer partitions) - the end to end *99th latency is 300-350
> ms*. Quite a bit of this (numbers in my previous emails) is the latency
> from producer -> broker and the latency from broker -> consumer. Still
> nowhere as poor as the *20 - 30* seconds I was seeing.
>
> ii) When I increase the maximum number of partitions, end to end latency
> increases dramatically. At *256 partitions* the end to end *99th latency is
> still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but
> not by much. When I increase this number to *512 partitions *the end
> to end *99th
> latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the
> *99th
> latency is at 25 - 30 seconds*.
> A table of the numbers:
>
> Max number of partitions written to (out of 1024)
>
> End to end latency
>
> 8
>
> 300 - 350 ms
>
> 256
>
> 390 - 418 ms
>
> 512
>
> 19 - 24 seconds
>
> 1024
>
> 25 - 30 seconds
>
>
> iii) Once I make the max number of partitions high enough, reducing it
> doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency
> goes up. But while the producer is running if I go down from  *512* to
> *8 *partitions,
> it doesn't reduce the latency numbers. My guess is that the producer is
> creating some state lazily per partition and this state is causing the
> latency. Once this state is created, writing to fewer partitions doesn't
> seem to help. Only a restart of the producer calms things down.
>
> So my current plan is to reduce the number of partitions on the topic, but
> there seems to be something deeper going on for the latency numbers to be
> so poor to begin with and then suffer so much more (non linearly) with
> additional partitions.
>
> Thanks!
>
> On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian 
> wrote:
> >
> > I've done some more measurements. I've also started measuring the latency
> > between when I ask my producer to send a message and when I get an
> > acknowledgement via the callback. Here is my code:
> >
> > // This function is called on every producer once every 30 seconds.
> >
> > public void addLagMarkers(final Histogram enqueueLag) {
> >
> >  

Re: Trying to figure out kafka latency issues

2014-12-21 Thread Rajiv Kurian
I'll take a look at the GC profile of the brokers Right now I keep a tab on
the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not
JVM heap) free disk space on the broker. I'll need to take a look at the
JVM metrics too. What seemed strange is that going from 8 -> 512 partitions
increases the latency, but going fro 512-> 8 does not decrease it. I have
to restart the producer (but not the broker) for the end to end latency to
go down That made it seem  that the fault was probably with the producer
and not the broker. Only restarting the producer made things better. I'll
do more extensive measurement on the broker.

On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges 
wrote:
>
> Did you see my response and have you checked the server logs especially
> the GC logs? It still sounds like you are running out of memory on the
> broker. What is your max heap memory and are you thrashing once you start
> writing to all those partitions?
>
> You have measured very thoroughly from an external point of view, i think
> now you'll have to start measuring the internal metrics. Maybe someone else
> will have ideas on what jmx values to watch.
>
> Best,
> Thunder
>
>
> -Original Message-
> From: Rajiv Kurian [ra...@signalfuse.com]
> Received: Saturday, 20 Dec 2014, 10:24PM
> To: users@kafka.apache.org [users@kafka.apache.org]
> Subject: Re: Trying to figure out kafka latency issues
>
> Some more work tells me that the end to end latency numbers vary with the
> number of partitions I am writing to. I did an experiment, where based on a
> run time flag I would dynamically select how many of the *1024 partitions*
> I write to. So say I decide I'll write to at most 256 partitions I mod
> whatever partition I would actually write to by 256. Basically the number
> of partitions for this topic on the broker remains the same at *1024*
> partitions but the number of partitions my producers write to changes
> dynamically based on a run time flag. So something like this:
>
> int partition = getPartitionForMessage(message);
> int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag can be
> updated without bringing the application down - just a volatile read of
> some number set externally.
> int moddedPartition = partition % maxPartitionsToWrite.
> // Send a message to this Kafka partition.
>
> Here are some interesting things I've noticed:
>
> i) When I start my client and it *never writes* to more than *8
> partitions *(same
> data rate but fewer partitions) - the end to end *99th latency is 300-350
> ms*. Quite a bit of this (numbers in my previous emails) is the latency
> from producer -> broker and the latency from broker -> consumer. Still
> nowhere as poor as the *20 - 30* seconds I was seeing.
>
> ii) When I increase the maximum number of partitions, end to end latency
> increases dramatically. At *256 partitions* the end to end *99th latency is
> still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but
> not by much. When I increase this number to *512 partitions *the end
> to end *99th
> latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the
> *99th
> latency is at 25 - 30 seconds*.
> A table of the numbers:
>
> Max number of partitions written to (out of 1024)
>
> End to end latency
>
> 8
>
> 300 - 350 ms
>
> 256
>
> 390 - 418 ms
>
> 512
>
> 19 - 24 seconds
>
> 1024
>
> 25 - 30 seconds
>
>
> iii) Once I make the max number of partitions high enough, reducing it
> doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency
> goes up. But while the producer is running if I go down from  *512* to
> *8 *partitions,
> it doesn't reduce the latency numbers. My guess is that the producer is
> creating some state lazily per partition and this state is causing the
> latency. Once this state is created, writing to fewer partitions doesn't
> seem to help. Only a restart of the producer calms things down.
>
> So my current plan is to reduce the number of partitions on the topic, but
> there seems to be something deeper going on for the latency numbers to be
> so poor to begin with and then suffer so much more (non linearly) with
> additional partitions.
>
> Thanks!
>
> On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian 
> wrote:
> >
> > I've done some more measurements. I've also started measuring the latency
> > between when I ask my producer to send a message and when I get an
> > acknowledgement via the callback. Here is my code:
> >
> > // This function is called on every producer once every 30 seconds.
> >
> > public void addLagMarkers(final Histogram enqueueLag) {
> >
> > final int numberOfPartitions = 1024;
> >
> > final long timeOfEnqueue = System.currentTimeMillis();
> >
> > final Callback callback = new Callback() {
> >
> > @Override
> >
> > public void onCompletion(RecordMetadata metadata, Exception
> ex)
> > {
> >
> > if (metadata != null) {
> >
> > // The

RE: Trying to figure out kafka latency issues

2014-12-21 Thread Thunder Stumpges
Did you see my response and have you checked the server logs especially the GC 
logs? It still sounds like you are running out of memory on the broker. What is 
your max heap memory and are you thrashing once you start writing to all those 
partitions?

You have measured very thoroughly from an external point of view, i think now 
you'll have to start measuring the internal metrics. Maybe someone else will 
have ideas on what jmx values to watch.

Best,
Thunder


-Original Message-
From: Rajiv Kurian [ra...@signalfuse.com]
Received: Saturday, 20 Dec 2014, 10:24PM
To: users@kafka.apache.org [users@kafka.apache.org]
Subject: Re: Trying to figure out kafka latency issues

Some more work tells me that the end to end latency numbers vary with the
number of partitions I am writing to. I did an experiment, where based on a
run time flag I would dynamically select how many of the *1024 partitions*
I write to. So say I decide I'll write to at most 256 partitions I mod
whatever partition I would actually write to by 256. Basically the number
of partitions for this topic on the broker remains the same at *1024*
partitions but the number of partitions my producers write to changes
dynamically based on a run time flag. So something like this:

int partition = getPartitionForMessage(message);
int maxPartitionsToWriteTo = maxPartitionsFlag.get();   // This flag can be
updated without bringing the application down - just a volatile read of
some number set externally.
int moddedPartition = partition % maxPartitionsToWrite.
// Send a message to this Kafka partition.

Here are some interesting things I've noticed:

i) When I start my client and it *never writes* to more than *8
partitions *(same
data rate but fewer partitions) - the end to end *99th latency is 300-350
ms*. Quite a bit of this (numbers in my previous emails) is the latency
from producer -> broker and the latency from broker -> consumer. Still
nowhere as poor as the *20 - 30* seconds I was seeing.

ii) When I increase the maximum number of partitions, end to end latency
increases dramatically. At *256 partitions* the end to end *99th latency is
still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but
not by much. When I increase this number to *512 partitions *the end
to end *99th
latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the *99th
latency is at 25 - 30 seconds*.
A table of the numbers:

Max number of partitions written to (out of 1024)

End to end latency

8

300 - 350 ms

256

390 - 418 ms

512

19 - 24 seconds

1024

25 - 30 seconds


iii) Once I make the max number of partitions high enough, reducing it
doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency
goes up. But while the producer is running if I go down from  *512* to
*8 *partitions,
it doesn't reduce the latency numbers. My guess is that the producer is
creating some state lazily per partition and this state is causing the
latency. Once this state is created, writing to fewer partitions doesn't
seem to help. Only a restart of the producer calms things down.

So my current plan is to reduce the number of partitions on the topic, but
there seems to be something deeper going on for the latency numbers to be
so poor to begin with and then suffer so much more (non linearly) with
additional partitions.

Thanks!

On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian  wrote:
>
> I've done some more measurements. I've also started measuring the latency
> between when I ask my producer to send a message and when I get an
> acknowledgement via the callback. Here is my code:
>
> // This function is called on every producer once every 30 seconds.
>
> public void addLagMarkers(final Histogram enqueueLag) {
>
> final int numberOfPartitions = 1024;
>
> final long timeOfEnqueue = System.currentTimeMillis();
>
> final Callback callback = new Callback() {
>
> @Override
>
> public void onCompletion(RecordMetadata metadata, Exception ex)
> {
>
> if (metadata != null) {
>
> // The difference between ack time from broker and
> enqueue time.
>
> final long timeOfAck = System.currentTimeMillis();
>
> final long lag = timeOfAck - timeOfEnqueue;
>
> enqueueLag.update(lag);
>
> }
>
> }
>
> };
>
> for (int i = 0; i < numberOfPartitions; i++) {
>
> try {
>
> byte[] value = LagMarker.serialize(timeOfEnqueue);  // 10
> bytes -> short version + long timestamp.
>
> // This message is later used by the consumers to measure
> lag.
>
> ProducerRecord record = new ProducerRecord(MY_TOPIC, i,
> null, value);
>
> kafkaProducer.send(record, callback);
>
> } catch (Exception e) {
>
> // We just dropped a lag marker.
>
> }
>
> }
>
> }
>
> The* 99th* on this lag i

Re: Kafka 0.8.2 new producer blocking on metadata

2014-12-21 Thread Paul Pearcy
Sounds good.

Yes, I'd want a guarantee that every future I get will always return the
recordmeta or an exception eventually.

Running into a similar issue with futures never returning with a pretty
straightforward case:
- Healthy producer/server setup
- Stop the server
- Send a message
- Call get on the future and it never returns. Doesn't matter if the server
is started again or remains stopped

Stepping into the producer code, it appears that in Sender.run a response
never comes back for the send while things are down and handleDisconnect is
never called.

These are basically the same tests I am running against 0.8.1.1 producer,
but I could have some wires crossed, so would be curious if others see
similar.

Thanks,
Paul


On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps  wrote:

> Yeah if you want to file and JIRA and post a patch for a new option its
> possible others would want it. Maybe something like
>   pre.initialize.topics=x,y,z
>   pre.initialize.timeout=x
>
> The metadata fetch timeout is a bug...that behavior is inherited from
> Object.wait which defines zero to mean infinite but I think that is not
> very intuitive. If you file a ticket on that we could just fix it. I think
> being able to set 0 is actually useful for this case you are trying for.
>
> WRT to stopping the producer I think what you are saying is that you want
> it to be the case that calling close() on the producer immediately fails
> all outstanding requests with some exception, right?
>
> -Jay
>
> On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy  wrote:
> >
> > Hi Jay,
> >   I have implemented a wrapper around the producer to behave like I want
> it
> > to. Where it diverges from current 0.8.2 producer is that it accepts
> three
> > new inputs:
> > - A list of expected topics
> > - A timeout value to init meta for those topics during producer creationg
> > - An option to blow up if we fail to init topic meta within some amount
> of
> > time
> >
> > I also needed to set  metadata.fetch.timeout.ms=1, as 0 means it will
> > block
> > forever and kick off a thread to do the topic meta data init in the
> > background.
> >
> > On the send side, things do fail fast, now. Only current hiccup(not
> > completely done re-working my tests, though) I am hitting now is that
> > messages accepted by the producer after the server have been stopped
> never
> > return a status if the producer is stopped, think this is a bug.
> >
> > Are you sure you wouldn't want any of this behavior in client by default
> > which would give out of the box choices to be made on blocking behavior?
> > Happy to share code or send a PR.
> >
> > Thanks,
> > Paul
> >
> > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps  wrote:
> >
> > > Hey Paul,
> > >
> > > I agree we should document this better.
> > >
> > > We allow and encourage using partitions to semantically distribute
> data.
> > So
> > > unfortunately we can't just arbitrarily assign a partition (say 0) as
> > that
> > > would actually give incorrect answers for any consumer that made use of
> > the
> > > partitioning. It is true that the user can change the partitioning, but
> > we
> > > can't ignore the partitioning they have set.
> > >
> > > I get the use case you have--you basically want a hard guarantee that
> > > send() will never block (so presumably you have set to also drop data
> if
> > > the buffer fills up). As I said the blocking only occurs on the first
> > > request for a given topic and you can avoid it by pre-initializing the
> > > topic metadata.
> > >
> > > I think the option you describe is actually possible now. Basically you
> > can
> > > initialize the metadata for topics you care about using that
> > > partitionsFor() call. If you set the property
> metadata.fetch.timeout.ms
> > =0
> > > then any send calls prior to the completion of metadata initialization
> > will
> > > fail immediately rather than block.
> > >
> > > -Jay
> > >
> > >
> > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy 
> wrote:
> > > >
> > > > Hi Jay,
> > > >   Many thanks for the info. All that makes sense, but from an API
> > > > standpoint when something is labelled async and returns a Future,
> this
> > > will
> > > > be misconstrued and developers will place async sends in critical
> > client
> > > > facing request/response pathways of code that should never block. If
> > the
> > > > app comes up with a bad config, it will hang all incoming
> connections.
> > > >
> > > > Obviously, there is a spectrum of use cases with regard to message
> loss
> > > and
> > > > the defaults cannot cater to all use cases. I like that the defaults
> > tend
> > > > towards best effort guarantees, but I am not sure it justifies the
> > > > inconsistency in the API.
> > > >
> > > > 1) It sounds like the client is already structured to handle changes
> in
> > > > partitions on the fly, I am sure I am over simplifying but in the
> case
> > > > where no meta is available, but my naive approach would be assume
> some
> > > > number of partitions and t