Producer is very slow in Kafka .8
Hi, I was trying out Kafka .8 and was using PHP kafka( https://github.com/salebab/phpkafka ) extension to produce in PHP Unfortunately it does not produce faster than a 3 messages per second reliably. Was wondering if anyone can help me speed this up. Thanks, Saurabh
Re: Kafka 0.8.2 new producer blocking on metadata
FYI, I bumped server to 0.8.2-beta and I don't hit the basic failure I mentioned above, which is great. I haven't been able to find confirmation in the docs, but from past conversation( http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3c20140829174552.ga30...@jkoshy-ld.linkedin.biz%3E), it seems that 0.8.2 producer should be fully compatible with 0.8.1.1 broker. >From everything I see running in a single node config, the 0.8.2 Java producer is effectively dead after a complete disconnect to the 0.8.1.1 broker. Thanks, Paul On Sun, Dec 21, 2014 at 3:06 AM, Paul Pearcy wrote: > Sounds good. > > Yes, I'd want a guarantee that every future I get will always return the > recordmeta or an exception eventually. > > Running into a similar issue with futures never returning with a pretty > straightforward case: > - Healthy producer/server setup > - Stop the server > - Send a message > - Call get on the future and it never returns. Doesn't matter if the > server is started again or remains stopped > > Stepping into the producer code, it appears that in Sender.run a response > never comes back for the send while things are down and handleDisconnect is > never called. > > These are basically the same tests I am running against 0.8.1.1 producer, > but I could have some wires crossed, so would be curious if others see > similar. > > Thanks, > Paul > > > On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps wrote: > >> Yeah if you want to file and JIRA and post a patch for a new option its >> possible others would want it. Maybe something like >> pre.initialize.topics=x,y,z >> pre.initialize.timeout=x >> >> The metadata fetch timeout is a bug...that behavior is inherited from >> Object.wait which defines zero to mean infinite but I think that is not >> very intuitive. If you file a ticket on that we could just fix it. I think >> being able to set 0 is actually useful for this case you are trying for. >> >> WRT to stopping the producer I think what you are saying is that you want >> it to be the case that calling close() on the producer immediately fails >> all outstanding requests with some exception, right? >> >> -Jay >> >> On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy wrote: >> > >> > Hi Jay, >> > I have implemented a wrapper around the producer to behave like I >> want it >> > to. Where it diverges from current 0.8.2 producer is that it accepts >> three >> > new inputs: >> > - A list of expected topics >> > - A timeout value to init meta for those topics during producer >> creationg >> > - An option to blow up if we fail to init topic meta within some amount >> of >> > time >> > >> > I also needed to set metadata.fetch.timeout.ms=1, as 0 means it will >> > block >> > forever and kick off a thread to do the topic meta data init in the >> > background. >> > >> > On the send side, things do fail fast, now. Only current hiccup(not >> > completely done re-working my tests, though) I am hitting now is that >> > messages accepted by the producer after the server have been stopped >> never >> > return a status if the producer is stopped, think this is a bug. >> > >> > Are you sure you wouldn't want any of this behavior in client by default >> > which would give out of the box choices to be made on blocking behavior? >> > Happy to share code or send a PR. >> > >> > Thanks, >> > Paul >> > >> > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps wrote: >> > >> > > Hey Paul, >> > > >> > > I agree we should document this better. >> > > >> > > We allow and encourage using partitions to semantically distribute >> data. >> > So >> > > unfortunately we can't just arbitrarily assign a partition (say 0) as >> > that >> > > would actually give incorrect answers for any consumer that made use >> of >> > the >> > > partitioning. It is true that the user can change the partitioning, >> but >> > we >> > > can't ignore the partitioning they have set. >> > > >> > > I get the use case you have--you basically want a hard guarantee that >> > > send() will never block (so presumably you have set to also drop data >> if >> > > the buffer fills up). As I said the blocking only occurs on the first >> > > request for a given topic and you can avoid it by pre-initializing the >> > > topic metadata. >> > > >> > > I think the option you describe is actually possible now. Basically >> you >> > can >> > > initialize the metadata for topics you care about using that >> > > partitionsFor() call. If you set the property >> metadata.fetch.timeout.ms >> > =0 >> > > then any send calls prior to the completion of metadata initialization >> > will >> > > fail immediately rather than block. >> > > >> > > -Jay >> > > >> > > >> > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy >> wrote: >> > > > >> > > > Hi Jay, >> > > > Many thanks for the info. All that makes sense, but from an API >> > > > standpoint when something is labelled async and returns a Future, >> this >> > > will >> > > > be misconstrued and developers will place async sends in critical >>
Re: Produce 1 million events/seconds
*Kafka: *Apache Kafka 0.8.1.1 *SImplePartitioner.java* public int partition(Object key, int a_numPartitions) { int partition = Integer.parseInt((String)key); LOG.debug("SimplePartitioner Partion: " + partition); return partition; } On Sun, Dec 21, 2014 at 10:54 PM, Pramod Deshmukh wrote: > I have a requirement to prove kafka producer can produce 1 million > events/second to Kafka cluster. > > So far, best I could achieve is 200k events/sec on topic with 2 > partitions. The latency increases with adding more partitions so I want to > test with 2 partitions for now. > > Below are the details along with produce code (java). How can I achieve > produce 1million event/sec.? I went thru kafka benchmarking blog as well. > > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs), > 64GB RAM. > *Broker:* Allocated 6GB, 16 io.threads, 8 network threads. > *Topic: 2* partition, replication factor of 1 (Get high latency) > *Zookeepers: *3 zk instances running individually on master nodes (not > co-located with kafka broker/servers) > > > *Producer Code:* > public class TestProducer { > > private static String msg = "TEST KAFKA PERFORMANCE"; > private static Logger LOG = Logger.getLogger(TestProducer.class); > > public static void main(String... args){ > System.out.println("START - Test Producer"); > > long messageCount = Long.parseLong(args[0]); > long messageCountForStat = Long.parseLong(args[0]); > String topic = args[1]; > String brokerList = args[2]; > int batchCount = Integer.parseInt(args[3]); > int topicPartions = Integer.parseInt(args[4]); > Producer producer = getProducer(brokerList, > batchCount); > Date startTime = new Date(System.currentTimeMillis()); > Random rnd = new Random(); > String partition = ""; > //Produce messages. > while (messageCount != 0) { > partition = ""+(int)messageCount%topicPartions; > KeyedMessage message = > new KeyedMessage(topic, partition, > msg); > producer.send(message); > messageCount--; > } > > Date endTime = new Date(System.currentTimeMillis()); > System.out.println("#"); > System.out.println("MESSAGES SENT: " + messageCountForStat); > System.out.println("START TIME: " + startTime); > System.out.println("END TIME: " + endTime); > System.out.println("#"); > System.out.println("END - Test Producer"); > } > > public static Producer getProducer(String brokerList, > int batchSize) { > > props.put("metadata.broker.list", brokerList); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("partitioner.class", "com.my.SimplePartitioner"); > props.put("request.required.acks", "0"); > props.put("producer.type", "async"); > props.put("compression.codec", "snappy"); > props.put("batch.num.messages", Integer.toString(batchSize)); > > ProducerConfig config = new ProducerConfig(props); > > Producer producer = new Producer String>(config); > return producer; > } > > } >
Produce 1 million events/seconds
I have a requirement to prove kafka producer can produce 1 million events/second to Kafka cluster. So far, best I could achieve is 200k events/sec on topic with 2 partitions. The latency increases with adding more partitions so I want to test with 2 partitions for now. Below are the details along with produce code (java). How can I achieve produce 1million event/sec.? I went thru kafka benchmarking blog as well. https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines *Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs), 64GB RAM. *Broker:* Allocated 6GB, 16 io.threads, 8 network threads. *Topic: 2* partition, replication factor of 1 (Get high latency) *Zookeepers: *3 zk instances running individually on master nodes (not co-located with kafka broker/servers) *Producer Code:* public class TestProducer { private static String msg = "TEST KAFKA PERFORMANCE"; private static Logger LOG = Logger.getLogger(TestProducer.class); public static void main(String... args){ System.out.println("START - Test Producer"); long messageCount = Long.parseLong(args[0]); long messageCountForStat = Long.parseLong(args[0]); String topic = args[1]; String brokerList = args[2]; int batchCount = Integer.parseInt(args[3]); int topicPartions = Integer.parseInt(args[4]); Producer producer = getProducer(brokerList, batchCount); Date startTime = new Date(System.currentTimeMillis()); Random rnd = new Random(); String partition = ""; //Produce messages. while (messageCount != 0) { partition = ""+(int)messageCount%topicPartions; KeyedMessage message = new KeyedMessage(topic, partition, msg); producer.send(message); messageCount--; } Date endTime = new Date(System.currentTimeMillis()); System.out.println("#"); System.out.println("MESSAGES SENT: " + messageCountForStat); System.out.println("START TIME: " + startTime); System.out.println("END TIME: " + endTime); System.out.println("#"); System.out.println("END - Test Producer"); } public static Producer getProducer(String brokerList, int batchSize) { props.put("metadata.broker.list", brokerList); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "com.my.SimplePartitioner"); props.put("request.required.acks", "0"); props.put("producer.type", "async"); props.put("compression.codec", "snappy"); props.put("batch.num.messages", Integer.toString(batchSize)); ProducerConfig config = new ProducerConfig(props); Producer producer = new Producer(config); return producer; } }
RE: Trying to figure out kafka latency issues
Ah I thought it was restarting the broker that made things better :) Yeah I have no experience with the Java client so can't really help there. Good luck! -Original Message- From: Rajiv Kurian [ra...@signalfuse.com] Received: Sunday, 21 Dec 2014, 12:25PM To: users@kafka.apache.org [users@kafka.apache.org] Subject: Re: Trying to figure out kafka latency issues I'll take a look at the GC profile of the brokers Right now I keep a tab on the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not JVM heap) free disk space on the broker. I'll need to take a look at the JVM metrics too. What seemed strange is that going from 8 -> 512 partitions increases the latency, but going fro 512-> 8 does not decrease it. I have to restart the producer (but not the broker) for the end to end latency to go down That made it seem that the fault was probably with the producer and not the broker. Only restarting the producer made things better. I'll do more extensive measurement on the broker. On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges wrote: > > Did you see my response and have you checked the server logs especially > the GC logs? It still sounds like you are running out of memory on the > broker. What is your max heap memory and are you thrashing once you start > writing to all those partitions? > > You have measured very thoroughly from an external point of view, i think > now you'll have to start measuring the internal metrics. Maybe someone else > will have ideas on what jmx values to watch. > > Best, > Thunder > > > -Original Message- > From: Rajiv Kurian [ra...@signalfuse.com] > Received: Saturday, 20 Dec 2014, 10:24PM > To: users@kafka.apache.org [users@kafka.apache.org] > Subject: Re: Trying to figure out kafka latency issues > > Some more work tells me that the end to end latency numbers vary with the > number of partitions I am writing to. I did an experiment, where based on a > run time flag I would dynamically select how many of the *1024 partitions* > I write to. So say I decide I'll write to at most 256 partitions I mod > whatever partition I would actually write to by 256. Basically the number > of partitions for this topic on the broker remains the same at *1024* > partitions but the number of partitions my producers write to changes > dynamically based on a run time flag. So something like this: > > int partition = getPartitionForMessage(message); > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This flag can be > updated without bringing the application down - just a volatile read of > some number set externally. > int moddedPartition = partition % maxPartitionsToWrite. > // Send a message to this Kafka partition. > > Here are some interesting things I've noticed: > > i) When I start my client and it *never writes* to more than *8 > partitions *(same > data rate but fewer partitions) - the end to end *99th latency is 300-350 > ms*. Quite a bit of this (numbers in my previous emails) is the latency > from producer -> broker and the latency from broker -> consumer. Still > nowhere as poor as the *20 - 30* seconds I was seeing. > > ii) When I increase the maximum number of partitions, end to end latency > increases dramatically. At *256 partitions* the end to end *99th latency is > still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but > not by much. When I increase this number to *512 partitions *the end > to end *99th > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the > *99th > latency is at 25 - 30 seconds*. > A table of the numbers: > > Max number of partitions written to (out of 1024) > > End to end latency > > 8 > > 300 - 350 ms > > 256 > > 390 - 418 ms > > 512 > > 19 - 24 seconds > > 1024 > > 25 - 30 seconds > > > iii) Once I make the max number of partitions high enough, reducing it > doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency > goes up. But while the producer is running if I go down from *512* to > *8 *partitions, > it doesn't reduce the latency numbers. My guess is that the producer is > creating some state lazily per partition and this state is causing the > latency. Once this state is created, writing to fewer partitions doesn't > seem to help. Only a restart of the producer calms things down. > > So my current plan is to reduce the number of partitions on the topic, but > there seems to be something deeper going on for the latency numbers to be > so poor to begin with and then suffer so much more (non linearly) with > additional partitions. > > Thanks! > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian > wrote: > > > > I've done some more measurements. I've also started measuring the latency > > between when I ask my producer to send a message and when I get an > > acknowledgement via the callback. Here is my code: > > > > // This function is called on every producer once every 30 seconds. > > > > public void addLagMarkers(final Histogram enqueueLag) { > > > >
Re: Trying to figure out kafka latency issues
I'll take a look at the GC profile of the brokers Right now I keep a tab on the CPU, Messages in, Bytes in, Bytes out, free memory (on the machine not JVM heap) free disk space on the broker. I'll need to take a look at the JVM metrics too. What seemed strange is that going from 8 -> 512 partitions increases the latency, but going fro 512-> 8 does not decrease it. I have to restart the producer (but not the broker) for the end to end latency to go down That made it seem that the fault was probably with the producer and not the broker. Only restarting the producer made things better. I'll do more extensive measurement on the broker. On Sun, Dec 21, 2014 at 9:08 AM, Thunder Stumpges wrote: > > Did you see my response and have you checked the server logs especially > the GC logs? It still sounds like you are running out of memory on the > broker. What is your max heap memory and are you thrashing once you start > writing to all those partitions? > > You have measured very thoroughly from an external point of view, i think > now you'll have to start measuring the internal metrics. Maybe someone else > will have ideas on what jmx values to watch. > > Best, > Thunder > > > -Original Message- > From: Rajiv Kurian [ra...@signalfuse.com] > Received: Saturday, 20 Dec 2014, 10:24PM > To: users@kafka.apache.org [users@kafka.apache.org] > Subject: Re: Trying to figure out kafka latency issues > > Some more work tells me that the end to end latency numbers vary with the > number of partitions I am writing to. I did an experiment, where based on a > run time flag I would dynamically select how many of the *1024 partitions* > I write to. So say I decide I'll write to at most 256 partitions I mod > whatever partition I would actually write to by 256. Basically the number > of partitions for this topic on the broker remains the same at *1024* > partitions but the number of partitions my producers write to changes > dynamically based on a run time flag. So something like this: > > int partition = getPartitionForMessage(message); > int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This flag can be > updated without bringing the application down - just a volatile read of > some number set externally. > int moddedPartition = partition % maxPartitionsToWrite. > // Send a message to this Kafka partition. > > Here are some interesting things I've noticed: > > i) When I start my client and it *never writes* to more than *8 > partitions *(same > data rate but fewer partitions) - the end to end *99th latency is 300-350 > ms*. Quite a bit of this (numbers in my previous emails) is the latency > from producer -> broker and the latency from broker -> consumer. Still > nowhere as poor as the *20 - 30* seconds I was seeing. > > ii) When I increase the maximum number of partitions, end to end latency > increases dramatically. At *256 partitions* the end to end *99th latency is > still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but > not by much. When I increase this number to *512 partitions *the end > to end *99th > latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the > *99th > latency is at 25 - 30 seconds*. > A table of the numbers: > > Max number of partitions written to (out of 1024) > > End to end latency > > 8 > > 300 - 350 ms > > 256 > > 390 - 418 ms > > 512 > > 19 - 24 seconds > > 1024 > > 25 - 30 seconds > > > iii) Once I make the max number of partitions high enough, reducing it > doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency > goes up. But while the producer is running if I go down from *512* to > *8 *partitions, > it doesn't reduce the latency numbers. My guess is that the producer is > creating some state lazily per partition and this state is causing the > latency. Once this state is created, writing to fewer partitions doesn't > seem to help. Only a restart of the producer calms things down. > > So my current plan is to reduce the number of partitions on the topic, but > there seems to be something deeper going on for the latency numbers to be > so poor to begin with and then suffer so much more (non linearly) with > additional partitions. > > Thanks! > > On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian > wrote: > > > > I've done some more measurements. I've also started measuring the latency > > between when I ask my producer to send a message and when I get an > > acknowledgement via the callback. Here is my code: > > > > // This function is called on every producer once every 30 seconds. > > > > public void addLagMarkers(final Histogram enqueueLag) { > > > > final int numberOfPartitions = 1024; > > > > final long timeOfEnqueue = System.currentTimeMillis(); > > > > final Callback callback = new Callback() { > > > > @Override > > > > public void onCompletion(RecordMetadata metadata, Exception > ex) > > { > > > > if (metadata != null) { > > > > // The
RE: Trying to figure out kafka latency issues
Did you see my response and have you checked the server logs especially the GC logs? It still sounds like you are running out of memory on the broker. What is your max heap memory and are you thrashing once you start writing to all those partitions? You have measured very thoroughly from an external point of view, i think now you'll have to start measuring the internal metrics. Maybe someone else will have ideas on what jmx values to watch. Best, Thunder -Original Message- From: Rajiv Kurian [ra...@signalfuse.com] Received: Saturday, 20 Dec 2014, 10:24PM To: users@kafka.apache.org [users@kafka.apache.org] Subject: Re: Trying to figure out kafka latency issues Some more work tells me that the end to end latency numbers vary with the number of partitions I am writing to. I did an experiment, where based on a run time flag I would dynamically select how many of the *1024 partitions* I write to. So say I decide I'll write to at most 256 partitions I mod whatever partition I would actually write to by 256. Basically the number of partitions for this topic on the broker remains the same at *1024* partitions but the number of partitions my producers write to changes dynamically based on a run time flag. So something like this: int partition = getPartitionForMessage(message); int maxPartitionsToWriteTo = maxPartitionsFlag.get(); // This flag can be updated without bringing the application down - just a volatile read of some number set externally. int moddedPartition = partition % maxPartitionsToWrite. // Send a message to this Kafka partition. Here are some interesting things I've noticed: i) When I start my client and it *never writes* to more than *8 partitions *(same data rate but fewer partitions) - the end to end *99th latency is 300-350 ms*. Quite a bit of this (numbers in my previous emails) is the latency from producer -> broker and the latency from broker -> consumer. Still nowhere as poor as the *20 - 30* seconds I was seeing. ii) When I increase the maximum number of partitions, end to end latency increases dramatically. At *256 partitions* the end to end *99th latency is still 390 - 418 ms.* Worse than the latency figures for *8 *partitions, but not by much. When I increase this number to *512 partitions *the end to end *99th latency *becomes an intolerable *19-24 seconds*. At *1024* partitions the *99th latency is at 25 - 30 seconds*. A table of the numbers: Max number of partitions written to (out of 1024) End to end latency 8 300 - 350 ms 256 390 - 418 ms 512 19 - 24 seconds 1024 25 - 30 seconds iii) Once I make the max number of partitions high enough, reducing it doesn't help. For eg: If I go up from *8* to *512 *partitions, the latency goes up. But while the producer is running if I go down from *512* to *8 *partitions, it doesn't reduce the latency numbers. My guess is that the producer is creating some state lazily per partition and this state is causing the latency. Once this state is created, writing to fewer partitions doesn't seem to help. Only a restart of the producer calms things down. So my current plan is to reduce the number of partitions on the topic, but there seems to be something deeper going on for the latency numbers to be so poor to begin with and then suffer so much more (non linearly) with additional partitions. Thanks! On Sat, Dec 20, 2014 at 6:03 PM, Rajiv Kurian wrote: > > I've done some more measurements. I've also started measuring the latency > between when I ask my producer to send a message and when I get an > acknowledgement via the callback. Here is my code: > > // This function is called on every producer once every 30 seconds. > > public void addLagMarkers(final Histogram enqueueLag) { > > final int numberOfPartitions = 1024; > > final long timeOfEnqueue = System.currentTimeMillis(); > > final Callback callback = new Callback() { > > @Override > > public void onCompletion(RecordMetadata metadata, Exception ex) > { > > if (metadata != null) { > > // The difference between ack time from broker and > enqueue time. > > final long timeOfAck = System.currentTimeMillis(); > > final long lag = timeOfAck - timeOfEnqueue; > > enqueueLag.update(lag); > > } > > } > > }; > > for (int i = 0; i < numberOfPartitions; i++) { > > try { > > byte[] value = LagMarker.serialize(timeOfEnqueue); // 10 > bytes -> short version + long timestamp. > > // This message is later used by the consumers to measure > lag. > > ProducerRecord record = new ProducerRecord(MY_TOPIC, i, > null, value); > > kafkaProducer.send(record, callback); > > } catch (Exception e) { > > // We just dropped a lag marker. > > } > > } > > } > > The* 99th* on this lag i
Re: Kafka 0.8.2 new producer blocking on metadata
Sounds good. Yes, I'd want a guarantee that every future I get will always return the recordmeta or an exception eventually. Running into a similar issue with futures never returning with a pretty straightforward case: - Healthy producer/server setup - Stop the server - Send a message - Call get on the future and it never returns. Doesn't matter if the server is started again or remains stopped Stepping into the producer code, it appears that in Sender.run a response never comes back for the send while things are down and handleDisconnect is never called. These are basically the same tests I am running against 0.8.1.1 producer, but I could have some wires crossed, so would be curious if others see similar. Thanks, Paul On Fri, Dec 19, 2014 at 5:27 PM, Jay Kreps wrote: > Yeah if you want to file and JIRA and post a patch for a new option its > possible others would want it. Maybe something like > pre.initialize.topics=x,y,z > pre.initialize.timeout=x > > The metadata fetch timeout is a bug...that behavior is inherited from > Object.wait which defines zero to mean infinite but I think that is not > very intuitive. If you file a ticket on that we could just fix it. I think > being able to set 0 is actually useful for this case you are trying for. > > WRT to stopping the producer I think what you are saying is that you want > it to be the case that calling close() on the producer immediately fails > all outstanding requests with some exception, right? > > -Jay > > On Fri, Dec 19, 2014 at 1:55 PM, Paul Pearcy wrote: > > > > Hi Jay, > > I have implemented a wrapper around the producer to behave like I want > it > > to. Where it diverges from current 0.8.2 producer is that it accepts > three > > new inputs: > > - A list of expected topics > > - A timeout value to init meta for those topics during producer creationg > > - An option to blow up if we fail to init topic meta within some amount > of > > time > > > > I also needed to set metadata.fetch.timeout.ms=1, as 0 means it will > > block > > forever and kick off a thread to do the topic meta data init in the > > background. > > > > On the send side, things do fail fast, now. Only current hiccup(not > > completely done re-working my tests, though) I am hitting now is that > > messages accepted by the producer after the server have been stopped > never > > return a status if the producer is stopped, think this is a bug. > > > > Are you sure you wouldn't want any of this behavior in client by default > > which would give out of the box choices to be made on blocking behavior? > > Happy to share code or send a PR. > > > > Thanks, > > Paul > > > > On Fri, Dec 19, 2014 at 2:05 PM, Jay Kreps wrote: > > > > > Hey Paul, > > > > > > I agree we should document this better. > > > > > > We allow and encourage using partitions to semantically distribute > data. > > So > > > unfortunately we can't just arbitrarily assign a partition (say 0) as > > that > > > would actually give incorrect answers for any consumer that made use of > > the > > > partitioning. It is true that the user can change the partitioning, but > > we > > > can't ignore the partitioning they have set. > > > > > > I get the use case you have--you basically want a hard guarantee that > > > send() will never block (so presumably you have set to also drop data > if > > > the buffer fills up). As I said the blocking only occurs on the first > > > request for a given topic and you can avoid it by pre-initializing the > > > topic metadata. > > > > > > I think the option you describe is actually possible now. Basically you > > can > > > initialize the metadata for topics you care about using that > > > partitionsFor() call. If you set the property > metadata.fetch.timeout.ms > > =0 > > > then any send calls prior to the completion of metadata initialization > > will > > > fail immediately rather than block. > > > > > > -Jay > > > > > > > > > On Fri, Dec 19, 2014 at 9:32 AM, Paul Pearcy > wrote: > > > > > > > > Hi Jay, > > > > Many thanks for the info. All that makes sense, but from an API > > > > standpoint when something is labelled async and returns a Future, > this > > > will > > > > be misconstrued and developers will place async sends in critical > > client > > > > facing request/response pathways of code that should never block. If > > the > > > > app comes up with a bad config, it will hang all incoming > connections. > > > > > > > > Obviously, there is a spectrum of use cases with regard to message > loss > > > and > > > > the defaults cannot cater to all use cases. I like that the defaults > > tend > > > > towards best effort guarantees, but I am not sure it justifies the > > > > inconsistency in the API. > > > > > > > > 1) It sounds like the client is already structured to handle changes > in > > > > partitions on the fly, I am sure I am over simplifying but in the > case > > > > where no meta is available, but my naive approach would be assume > some > > > > number of partitions and t