sending mailchimp data to kafka cluster using producer api
Hello sir, I want to send mailchimp data to kafka broker(Topic) using producer api. counld you please help me?
Re: producer api
Thank you Erik. But in my setup, there is only one node whose public ip provided in my broker cluster, so I can only use one bootstrap broker as for now. On Mon, Sep 14, 2015 at 3:50 PM, Helleren, Erik wrote: > You only need one of the brokers to connect for publishing. Kafka will > tell the client about all the other brokers. But best practices state > including all of them is best. > -Erik > > On 9/14/15, 2:46 PM, "Yuheng Du" wrote: > > >I am writing a kafka producer application in java. I want the producer to > >publish data to a cluster of 6 brokers. Is there a way to specify only the > >load balancing node but not all the brokers list? > > > >For example, like in the benchmarking kafka commandssdg: > > > >bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance > >test 5000 100 -1 acks=-1 bootstrap.servers= > >esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 > batch.size=64000 > > > >it only specifies the bootstrap server node but not all the broker list > >like: > > > >Properties props = new Properties(); > > > >props.put("metadata.broker.list", "broker1:9092,broker2:9092"); > > > > > > > >Thanks for replying. > >
Re: producer api
You only need one of the brokers to connect for publishing. Kafka will tell the client about all the other brokers. But best practices state including all of them is best. -Erik On 9/14/15, 2:46 PM, "Yuheng Du" wrote: >I am writing a kafka producer application in java. I want the producer to >publish data to a cluster of 6 brokers. Is there a way to specify only the >load balancing node but not all the brokers list? > >For example, like in the benchmarking kafka commandssdg: > >bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance >test 5000 100 -1 acks=-1 bootstrap.servers= >esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000 > >it only specifies the bootstrap server node but not all the broker list >like: > >Properties props = new Properties(); > >props.put("metadata.broker.list", "broker1:9092,broker2:9092"); > > > >Thanks for replying.
producer api
I am writing a kafka producer application in java. I want the producer to publish data to a cluster of 6 brokers. Is there a way to specify only the load balancing node but not all the brokers list? For example, like in the benchmarking kafka commandssdg: bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test 5000 100 -1 acks=-1 bootstrap.servers= esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000 it only specifies the bootstrap server node but not all the broker list like: Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092"); Thanks for replying.
Re: New Producer API - batched sync mode support
@Shapira You are correct from my perspective. We are using kafka for a system where panels can send multiple events in a single message. The current contract is such that all events fail or succeed as a whole. If there is a failure the panel resends all the events. The existing producer api supports this fine, am I getting left behind here for the sake of brevity? I can get behind not adding every feature people ask for but taking away something is a different story all together. On Wed, Apr 29, 2015 at 9:08 PM, Gwen Shapira wrote: > I'm starting to think that the old adage "If two people say you are drunk, > lie down" applies here :) > > Current API seems perfectly clear, useful and logical to everyone who wrote > it... but we are getting multiple users asking for the old batch behavior > back. > One reason to get it back is to make upgrades easier - people won't need to > rethink their existing logic if they get an API with the same behavior in > the new producer. The other reason is what Ewen mentioned earlier - if > everyone re-implements Joel's logic, we can provide something for that. > > How about getting the old batch send behavior back by adding a new API > with: > public void batchSend(List>) > > With this implementation (mixes the old behavior with Joel's snippet): > * send records one by one > * flush > * iterate on futures and "get" them > * log a detailed message on each error > * throw an exception if any send failed. > > It reproduces the old behavior - which apparently everyone really liked, > and I don't think it is overly weird. It is very limited, but anyone who > needs more control over his sends already have plenty of options. > > Thoughts? > > Gwen > > > > > On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps wrote: > > > Hey guys, > > > > The locking argument is correct for very small records (< 50 bytes), > > batching will help here because for small records locking becomes the big > > bottleneck. I think these use cases are rare but not unreasonable. > > > > Overall I'd emphasize that the new producer is way faster at virtually > all > > use cases. If there is a use case where that isn't true, let's look at it > > in a data driven way by comparing the old producer to the new producer > and > > looking for any areas where things got worse. > > > > I suspect the "reducing allocations" argument to be not a big thing. We > do > > a number of small per-message allocations and it didn't seem to have much > > impact. I do think there are a couple of big producer memory > optimizations > > we could do by reusing the arrays in the accumulator in the serialization > > of the request but I don't think this is one of them. > > > > I'd be skeptical of any api that was too weird--i.e. introduces a new way > > of partitioning, gives back errors on a per-partition rather than per > > message basis (given that partitioning is transparent this is really hard > > to think about), etc. Bad apis end up causing a ton of churn and just > don't > > end up being a good long term commitment as we change how the underlying > > code works over time (i.e. we hyper optimize for something then have to > > maintain some super weird api as it becomes hyper unoptimized for the > > client over time). > > > > Roshan--Flush works as you would hope, it blocks on the completion of all > > outstanding requests. Calling get on the future for the request gives you > > the associated error code back. Flush doesn't throw any exceptions > because > > waiting for requests to complete doesn't error, the individual requests > > fail or succeed which is always reported with each request. > > > > Ivan--The batches you send in the scala producer today actually aren't > > truely atomic, they just get sent in a single request. > > > > One tricky problem to solve when user's do batching is size limits on > > requests. This can be very hard to manage since predicting the serialized > > size of a bunch of java objects is not always obvious. This was > repeatedly > > a problem before. > > > > -Jay > > > > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov > > wrote: > > > > > I must agree with @Roshan – it's hard to imagine anything more > intuitive > > > and easy to use for atomic batching as old sync batch api. Also, it's > > fast. > > > Coupled with a separate instance of producer per > > > broker:port:topic:partition it works very well. I would be glad if it > > finds > > > its way into new producer api. > > > > > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's > > > fetchSize must be set at least as batch bytes (before or after > > > compression), otherwise client risks not getting any messages? > > > > > >
Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server
Hi Jiangjie, Thanks for the reply. A little concern is on that: kafka-0.8.1 builds with scala-2.9.1 while kafka-0.8.2 builds with scala-2.10. Anyway, I will test it soon to confirm. Thanks. Best Regards, Zhuo Liu Ph.D. Student, CSSE department Auburn University, AL 36849 http://www.auburn.edu/~zzl0014/ From: Jiangjie Qin Sent: Wednesday, May 27, 2015 7:18 PM To: users@kafka.apache.org Subject: Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server It should work, but usually we prefer the server version to be not lower than client version. On 5/27/15, 3:12 PM, "Zhuo Liu" wrote: >Dear all, > > >In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.). > >My question is: will 0.8.2.1 new Producer API > >be able to successfully talk to a Kafka server cluster running > >with 0.8.1.1 Kafka? > > >Thanks very much! > > >Best Regards, >Zhuo Liu >Ph.D. Student, CSSE department >Auburn University, AL 36849 >http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>
Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server
It should work, but usually we prefer the server version to be not lower than client version. On 5/27/15, 3:12 PM, "Zhuo Liu" wrote: >Dear all, > > >In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.). > >My question is: will 0.8.2.1 new Producer API > >be able to successfully talk to a Kafka server cluster running > >with 0.8.1.1 Kafka? > > >Thanks very much! > > >Best Regards, >Zhuo Liu >Ph.D. Student, CSSE department >Auburn University, AL 36849 >http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>
Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server
Dear all, In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.). My question is: will 0.8.2.1 new Producer API be able to successfully talk to a Kafka server cluster running with 0.8.1.1 Kafka? Thanks very much! Best Regards, Zhuo Liu Ph.D. Student, CSSE department Auburn University, AL 36849 http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>
Re: New Producer API Design
Thanks guys, I got the point. We ended up handling the ser/de in custom wrappers over the generic byte[],byte[] producer to ensure only single producer for the application and neither lose the type safety. On Wed, May 13, 2015 at 11:31 PM, Guozhang Wang wrote: > Hello Mohit, > > When we originally design the new producer API we removed the serializer / > deserializer from the old producer and made it generic as accepting only > message, but we later concluded it would still be more > beneficial to add the serde back into the producer API. And as you observed > one consequence is that if you have multiple data serialization formats > then you have to use one producer for each. > > The recommended way of using Kafka producer in production is to stick with > one serialization format, e.g. Avro, with different topics to have > different schemas that could still be ser-/de- by the single producer. > > Guozhang > > On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta < > success.mohit.gu...@gmail.com > > wrote: > > > Hello, > > > > I've a question regarding the design of the new Producer API. > > > > As per the design (KafkaProducer), it seems that a separate producer > > is required for every combination of key and value type. Where as, in > > documentation ( and elsewhere ) it's recommended to create a single > > producer instance per application and share it among the all the threads > > for best performance? > > > > One way to create only single producer would be to use byte[] as > key/value > > type and handle the serialization at the client itself, rather than the > > producer, similar to the example in javadocs. But wouldn't this defeat > the > > purpose of using generics in the producer? > > > > Specific to our use case, we have multiple types of messages, where each > > message type can have multiple custom serializers. And, a message can be > > pushed into mulitple topics with different serialization. > > > > > > -- > > Best Regards, > > > > Mohit Gupta > > > > > > -- > -- Guozhang > -- Best Regards, Mohit Gupta
Re: New Producer API Design
Hello Mohit, When we originally design the new producer API we removed the serializer / deserializer from the old producer and made it generic as accepting only message, but we later concluded it would still be more beneficial to add the serde back into the producer API. And as you observed one consequence is that if you have multiple data serialization formats then you have to use one producer for each. The recommended way of using Kafka producer in production is to stick with one serialization format, e.g. Avro, with different topics to have different schemas that could still be ser-/de- by the single producer. Guozhang On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta wrote: > Hello, > > I've a question regarding the design of the new Producer API. > > As per the design (KafkaProducer), it seems that a separate producer > is required for every combination of key and value type. Where as, in > documentation ( and elsewhere ) it's recommended to create a single > producer instance per application and share it among the all the threads > for best performance? > > One way to create only single producer would be to use byte[] as key/value > type and handle the serialization at the client itself, rather than the > producer, similar to the example in javadocs. But wouldn't this defeat the > purpose of using generics in the producer? > > Specific to our use case, we have multiple types of messages, where each > message type can have multiple custom serializers. And, a message can be > pushed into mulitple topics with different serialization. > > > -- > Best Regards, > > Mohit Gupta > -- -- Guozhang
Re: New Producer API Design
You can of course use KafkaProducer to get a producer interface that can accept a variety of types. For example, if you have an Avro serializer that accepts both primitive types (e.g. String, integer types) and complex types (e.g. records, arrays, maps), Object is the only type you can use to cover all of those. As long as your serializer supports it, you can use a general type and pass in a variety of types to a single producer. The drawback is that you don't get feedback at compile time if you pass in a type that you weren't expecting. For example, if you know your keys are always going to be Strings, it's probably a good idea to use a KafkaProducer so that you catch a case where you accidentally pass in a different object. There are a lot of use cases where an application is only producing a single format of data, so supporting the type checking can be valuable. The type checking isn't going to be perfect because of type erasure and since serializers are often instantiated via reflection. However, having the type information can offer some compile-time protection to application code using the clients. -Ewen On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta wrote: > Hello, > > I've a question regarding the design of the new Producer API. > > As per the design (KafkaProducer), it seems that a separate producer > is required for every combination of key and value type. Where as, in > documentation ( and elsewhere ) it's recommended to create a single > producer instance per application and share it among the all the threads > for best performance? > > One way to create only single producer would be to use byte[] as key/value > type and handle the serialization at the client itself, rather than the > producer, similar to the example in javadocs. But wouldn't this defeat the > purpose of using generics in the producer? > > Specific to our use case, we have multiple types of messages, where each > message type can have multiple custom serializers. And, a message can be > pushed into mulitple topics with different serialization. > > > -- > Best Regards, > > Mohit Gupta > -- Thanks, Ewen
New Producer API Design
Hello, I've a question regarding the design of the new Producer API. As per the design (KafkaProducer), it seems that a separate producer is required for every combination of key and value type. Where as, in documentation ( and elsewhere ) it's recommended to create a single producer instance per application and share it among the all the threads for best performance? One way to create only single producer would be to use byte[] as key/value type and handle the serialization at the client itself, rather than the producer, similar to the example in javadocs. But wouldn't this defeat the purpose of using generics in the producer? Specific to our use case, we have multiple types of messages, where each message type can have multiple custom serializers. And, a message can be pushed into mulitple topics with different serialization. -- Best Regards, Mohit Gupta
Re: Doubt regarding new Producer and old Producer API
Hi Madhukar, 1. the java producer API can also be used for sync call; you can do it with producer.send().get(). 2. the partitioner class has been removed from the new producer API, instead now the message could have a specific partition id. You could calculate the partition id in your customized partitioner beforehand and put the partition id into the message. Guozhang On Wed, May 13, 2015 at 4:05 AM, Madhukar Bharti wrote: > Hi all, > > What are the possible use cases for using new producer API? > - Is this only provides async call with callback feature? > - Is partitioner class has been removed from new Producer API? if not then > how to implement it if I want to use only client APIs? > > > > Regards, > Madhukar > -- -- Guozhang
Doubt regarding new Producer and old Producer API
Hi all, What are the possible use cases for using new producer API? - Is this only provides async call with callback feature? - Is partitioner class has been removed from new Producer API? if not then how to implement it if I want to use only client APIs? Regards, Madhukar
Re: Differences between new and legacy scala producer API
On Thu, May 7, 2015 at 10:01 PM, Rendy Bambang Junior < rendy.b.jun...@gmail.com> wrote: > Hi > > - Legacy scala api for producer is having keyed message along with topic, > key, partkey, and message. Meanwhile new api has no partkey. Whats the > difference between key and partkey? > In the new API, key is encapsulated in the ProducerRecord. regards -- http://khangaonkar.blogspot.com/
Differences between new and legacy scala producer API
Hi - Legacy scala api for producer is having keyed message along with topic, key, partkey, and message. Meanwhile new api has no partkey. Whats the difference between key and partkey? - In javadoc, new producer api send method is always async, does producer.type properties overriden? - Will scala legacy api be deprecated any time soon? Rendy
Re: New Producer API - batched sync mode support
message basis (given that partitioning is transparent this is really hard > > to think about), etc. Bad apis end up causing a ton of churn and just > don't > > end up being a good long term commitment as we change how the underlying > > code works over time (i.e. we hyper optimize for something then have to > > maintain some super weird api as it becomes hyper unoptimized for the > > client over time). > > > > Roshan--Flush works as you would hope, it blocks on the completion of all > > outstanding requests. Calling get on the future for the request gives you > > the associated error code back. Flush doesn't throw any exceptions > because > > waiting for requests to complete doesn't error, the individual requests > > fail or succeed which is always reported with each request. > > > > Ivan--The batches you send in the scala producer today actually aren't > > truely atomic, they just get sent in a single request. > > > > One tricky problem to solve when user's do batching is size limits on > > requests. This can be very hard to manage since predicting the serialized > > size of a bunch of java objects is not always obvious. This was > repeatedly > > a problem before. > > > > -Jay > > > > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov > > wrote: > > > > > I must agree with @Roshan – it's hard to imagine anything more > intuitive > > > and easy to use for atomic batching as old sync batch api. Also, it's > > fast. > > > Coupled with a separate instance of producer per > > > broker:port:topic:partition it works very well. I would be glad if it > > finds > > > its way into new producer api. > > > > > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's > > > fetchSize must be set at least as batch bytes (before or after > > > compression), otherwise client risks not getting any messages? > > > > > >
Re: New Producer API - batched sync mode support
Roshan, If I understand correctly, you just want to make sure a number of messages has been sent successfully. Using callback might be easier to do so. Public class MyCallback implements Callback { public Set failedSend; @Override Public void onCompletion(RecordMetadata metadata, Exception exception) { If (exception != null) failedSend.add(metadata); } Public boolean hasFailure() {return failedSend.size() > 0); } In main code, you just need to do the following: { MyCallback callback = new MyCallback(); For (ProducerRecord record: records) Producer.send(); Producer.flush(); If (callback.hasFailure()) // do something } This will avoid the loop checking and provide you pretty much the same guarantee as old producer if not better. Jiangjie (Becket) Qin On 4/30/15, 4:54 PM, "Roshan Naik" wrote: >@Gwen, @Ewen, > While atomicity of a batch is nice to have, it is not essential. I don't >think users always expect such atomicity. Atomicity is not even guaranteed >in many un-batched systems let alone batched systems. > >As long as the client gets informed about the ones that failed in the >batch.. that would suffice. > >One issue with the current flush() based batch-sync implementation is that >the client needs to iterate over *all* futures in order to scan for any >failed messages. In the common case, it is just wasted CPU cycles as there >won't be any failures. Would be ideal if the client is informed about only >problematic messages. > > IMO, adding a new send(batch) API may be meaningful if it can provide >benefits beyond what user can do with a simple wrapper on existing stuff. >For example: eliminate the CPU cycles wasted on examining results from >successful message deliveries, or other efficiencies. > > > >@Ivan, > I am not certain, I am thinking that there is a possibility that the >first few messages of the batch got accepted, but not the remainder ? At >the same time based on some comments made earlier it appears underlying >implementation does have an all-or-none mechanism for a batch going to a >partition. >For simplicity, streaming clients may not want to deal explicitly with >partitions (and get exposed to repartitioning & leader change type issues) > >-roshan > > > >On 4/30/15 2:07 PM, "Gwen Shapira" wrote: > >>Why do we think atomicity is expected, if the old API we are emulating >>here >>lacks atomicity? >> >>I don't remember emails to the mailing list saying: "I expected this >>batch >>to be atomic, but instead I got duplicates when retrying after a failed >>batch send". >>Maybe atomicity isn't as strong requirement as we believe? That is, >>everyone expects some duplicates during failure events and handles them >>downstream? >> >> >> >>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov >>wrote: >> >>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava : >>> >>> > They aren't going to get this anyway (as Jay pointed out) given the >>> current >>> > broker implementation >>> > >>> >>> Is it also incorrect to assume atomicity even if all messages in the >>>batch >>> go to the same partition? >>> >
Re: New Producer API - batched sync mode support
@Gwen, @Ewen, While atomicity of a batch is nice to have, it is not essential. I don't think users always expect such atomicity. Atomicity is not even guaranteed in many un-batched systems let alone batched systems. As long as the client gets informed about the ones that failed in the batch.. that would suffice. One issue with the current flush() based batch-sync implementation is that the client needs to iterate over *all* futures in order to scan for any failed messages. In the common case, it is just wasted CPU cycles as there won't be any failures. Would be ideal if the client is informed about only problematic messages. IMO, adding a new send(batch) API may be meaningful if it can provide benefits beyond what user can do with a simple wrapper on existing stuff. For example: eliminate the CPU cycles wasted on examining results from successful message deliveries, or other efficiencies. @Ivan, I am not certain, I am thinking that there is a possibility that the first few messages of the batch got accepted, but not the remainder ? At the same time based on some comments made earlier it appears underlying implementation does have an all-or-none mechanism for a batch going to a partition. For simplicity, streaming clients may not want to deal explicitly with partitions (and get exposed to repartitioning & leader change type issues) -roshan On 4/30/15 2:07 PM, "Gwen Shapira" wrote: >Why do we think atomicity is expected, if the old API we are emulating >here >lacks atomicity? > >I don't remember emails to the mailing list saying: "I expected this batch >to be atomic, but instead I got duplicates when retrying after a failed >batch send". >Maybe atomicity isn't as strong requirement as we believe? That is, >everyone expects some duplicates during failure events and handles them >downstream? > > > >On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov >wrote: > >> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava : >> >> > They aren't going to get this anyway (as Jay pointed out) given the >> current >> > broker implementation >> > >> >> Is it also incorrect to assume atomicity even if all messages in the >>batch >> go to the same partition? >>
Re: New Producer API - batched sync mode support
Why do we think atomicity is expected, if the old API we are emulating here lacks atomicity? I don't remember emails to the mailing list saying: "I expected this batch to be atomic, but instead I got duplicates when retrying after a failed batch send". Maybe atomicity isn't as strong requirement as we believe? That is, everyone expects some duplicates during failure events and handles them downstream? On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov wrote: > 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava : > > > They aren't going to get this anyway (as Jay pointed out) given the > current > > broker implementation > > > > Is it also incorrect to assume atomicity even if all messages in the batch > go to the same partition? >
Re: New Producer API - batched sync mode support
2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava : > They aren't going to get this anyway (as Jay pointed out) given the current > broker implementation > Is it also incorrect to assume atomicity even if all messages in the batch go to the same partition?
Re: New Producer API - batched sync mode support
gt; > looking for any areas where things got worse. > > > > I suspect the "reducing allocations" argument to be not a big thing. We > do > > a number of small per-message allocations and it didn't seem to have much > > impact. I do think there are a couple of big producer memory > optimizations > > we could do by reusing the arrays in the accumulator in the serialization > > of the request but I don't think this is one of them. > > > > I'd be skeptical of any api that was too weird--i.e. introduces a new way > > of partitioning, gives back errors on a per-partition rather than per > > message basis (given that partitioning is transparent this is really hard > > to think about), etc. Bad apis end up causing a ton of churn and just > don't > > end up being a good long term commitment as we change how the underlying > > code works over time (i.e. we hyper optimize for something then have to > > maintain some super weird api as it becomes hyper unoptimized for the > > client over time). > > > > Roshan--Flush works as you would hope, it blocks on the completion of all > > outstanding requests. Calling get on the future for the request gives you > > the associated error code back. Flush doesn't throw any exceptions > because > > waiting for requests to complete doesn't error, the individual requests > > fail or succeed which is always reported with each request. > > > > Ivan--The batches you send in the scala producer today actually aren't > > truely atomic, they just get sent in a single request. > > > > One tricky problem to solve when user's do batching is size limits on > > requests. This can be very hard to manage since predicting the serialized > > size of a bunch of java objects is not always obvious. This was > repeatedly > > a problem before. > > > > -Jay > > > > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov > > wrote: > > > > > I must agree with @Roshan – it's hard to imagine anything more > intuitive > > > and easy to use for atomic batching as old sync batch api. Also, it's > > fast. > > > Coupled with a separate instance of producer per > > > broker:port:topic:partition it works very well. I would be glad if it > > finds > > > its way into new producer api. > > > > > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's > > > fetchSize must be set at least as batch bytes (before or after > > > compression), otherwise client risks not getting any messages? > > > > > > -- Thanks, Ewen
Re: New Producer API - batched sync mode support
I'm starting to think that the old adage "If two people say you are drunk, lie down" applies here :) Current API seems perfectly clear, useful and logical to everyone who wrote it... but we are getting multiple users asking for the old batch behavior back. One reason to get it back is to make upgrades easier - people won't need to rethink their existing logic if they get an API with the same behavior in the new producer. The other reason is what Ewen mentioned earlier - if everyone re-implements Joel's logic, we can provide something for that. How about getting the old batch send behavior back by adding a new API with: public void batchSend(List>) With this implementation (mixes the old behavior with Joel's snippet): * send records one by one * flush * iterate on futures and "get" them * log a detailed message on each error * throw an exception if any send failed. It reproduces the old behavior - which apparently everyone really liked, and I don't think it is overly weird. It is very limited, but anyone who needs more control over his sends already have plenty of options. Thoughts? Gwen On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps wrote: > Hey guys, > > The locking argument is correct for very small records (< 50 bytes), > batching will help here because for small records locking becomes the big > bottleneck. I think these use cases are rare but not unreasonable. > > Overall I'd emphasize that the new producer is way faster at virtually all > use cases. If there is a use case where that isn't true, let's look at it > in a data driven way by comparing the old producer to the new producer and > looking for any areas where things got worse. > > I suspect the "reducing allocations" argument to be not a big thing. We do > a number of small per-message allocations and it didn't seem to have much > impact. I do think there are a couple of big producer memory optimizations > we could do by reusing the arrays in the accumulator in the serialization > of the request but I don't think this is one of them. > > I'd be skeptical of any api that was too weird--i.e. introduces a new way > of partitioning, gives back errors on a per-partition rather than per > message basis (given that partitioning is transparent this is really hard > to think about), etc. Bad apis end up causing a ton of churn and just don't > end up being a good long term commitment as we change how the underlying > code works over time (i.e. we hyper optimize for something then have to > maintain some super weird api as it becomes hyper unoptimized for the > client over time). > > Roshan--Flush works as you would hope, it blocks on the completion of all > outstanding requests. Calling get on the future for the request gives you > the associated error code back. Flush doesn't throw any exceptions because > waiting for requests to complete doesn't error, the individual requests > fail or succeed which is always reported with each request. > > Ivan--The batches you send in the scala producer today actually aren't > truely atomic, they just get sent in a single request. > > One tricky problem to solve when user's do batching is size limits on > requests. This can be very hard to manage since predicting the serialized > size of a bunch of java objects is not always obvious. This was repeatedly > a problem before. > > -Jay > > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov > wrote: > > > I must agree with @Roshan – it's hard to imagine anything more intuitive > > and easy to use for atomic batching as old sync batch api. Also, it's > fast. > > Coupled with a separate instance of producer per > > broker:port:topic:partition it works very well. I would be glad if it > finds > > its way into new producer api. > > > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's > > fetchSize must be set at least as batch bytes (before or after > > compression), otherwise client risks not getting any messages? > > >
Re: New Producer API - batched sync mode support
Hey guys, The locking argument is correct for very small records (< 50 bytes), batching will help here because for small records locking becomes the big bottleneck. I think these use cases are rare but not unreasonable. Overall I'd emphasize that the new producer is way faster at virtually all use cases. If there is a use case where that isn't true, let's look at it in a data driven way by comparing the old producer to the new producer and looking for any areas where things got worse. I suspect the "reducing allocations" argument to be not a big thing. We do a number of small per-message allocations and it didn't seem to have much impact. I do think there are a couple of big producer memory optimizations we could do by reusing the arrays in the accumulator in the serialization of the request but I don't think this is one of them. I'd be skeptical of any api that was too weird--i.e. introduces a new way of partitioning, gives back errors on a per-partition rather than per message basis (given that partitioning is transparent this is really hard to think about), etc. Bad apis end up causing a ton of churn and just don't end up being a good long term commitment as we change how the underlying code works over time (i.e. we hyper optimize for something then have to maintain some super weird api as it becomes hyper unoptimized for the client over time). Roshan--Flush works as you would hope, it blocks on the completion of all outstanding requests. Calling get on the future for the request gives you the associated error code back. Flush doesn't throw any exceptions because waiting for requests to complete doesn't error, the individual requests fail or succeed which is always reported with each request. Ivan--The batches you send in the scala producer today actually aren't truely atomic, they just get sent in a single request. One tricky problem to solve when user's do batching is size limits on requests. This can be very hard to manage since predicting the serialized size of a bunch of java objects is not always obvious. This was repeatedly a problem before. -Jay On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov wrote: > I must agree with @Roshan – it's hard to imagine anything more intuitive > and easy to use for atomic batching as old sync batch api. Also, it's fast. > Coupled with a separate instance of producer per > broker:port:topic:partition it works very well. I would be glad if it finds > its way into new producer api. > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's > fetchSize must be set at least as batch bytes (before or after > compression), otherwise client risks not getting any messages? >
Re: New Producer API - batched sync mode support
I must agree with @Roshan – it's hard to imagine anything more intuitive and easy to use for atomic batching as old sync batch api. Also, it's fast. Coupled with a separate instance of producer per broker:port:topic:partition it works very well. I would be glad if it finds its way into new producer api. On a side-side-side note, could anyone confirm/deny if SimpleConsumer's fetchSize must be set at least as batch bytes (before or after compression), otherwise client risks not getting any messages?
Re: New Producer API - batched sync mode support
@Ewen No I did not use compression in my measurements.
Re: New Producer API - batched sync mode support
@Joel, If flush() works for this use case it may be an acceptable starting point (although not as clean as a native batched sync). I am not as yet clear about some aspects of flush's batch semantics and its suitability for this mode of operation. Allow me explore it with you folks.. 1) flush() guarantees: What is the guarantees that one can expect when a flush() call returns ? Is it successful delivery of all events in the buffer to broker as per the configured ack setting ? 2) flush() error handling: It does not throw any exception. What is the mechanism for indicating failure in delivery of one or more events in the batch ? Is future.get() the way to detect it ? If so, will future.get() be applicable to all types of delivery failures (could be a network glitch or something simpler like Kafka responding that it was not being able to accept some events) 2) Multithreaded Clients: The situation being that each client thread is trying to push out a batch. flush() will pump out data from the not just the calling thread but also from other threads. Does Need to think through this a bit more if that¹s ok for Multi Threaded clients. 3) Extra synchronization and Object creation: Like Ewen pointed out, this method definitely creates too many (Future) objects and also too much locking/synchronization due to repeated calls to Producer.send() and future.get(). However, if the measured perf impact of this not too much then I guess its ok. @Ewen, I am unable to find the email where I mentioned 10bytes event size. To me 500byte to 1kB event size is more interesting. I had run measurements with event sizes 500byte, 1k, 4k, 8k and 16k. In 0.8.1 producer_perf_test tool, the 8k and 16k event sizes showed much better throughput in --sync mode (throughput almost doubled with doubling of event size). The perf tool was not using the Producer.send(list<> ) api though. I saw great improvement when I changed it to use the Producer.send(list<> ). Sometimes it easily exceeded the throughput async mode. -roshan On 4/27/15 10:32 PM, "Ewen Cheslack-Postava" wrote: >A couple of thoughts: > >1. @Joel I agree it's not hard to use the new API but it definitely is >more >verbose. If that snippet of code is being written across hundreds of >projects, that probably means we're missing an important API. Right now >I've only seen the one complaint, but it's worth finding out how many >people feel like it's missing. And given that internally each of the >returned Futures just uses the future for the entire batch, I think it's >probably worth investigating if getting rid of millions of allocs per >second is worth it, even if they should be in the nursery and fast to >collect. > >2. For lots of small messages, there's definitely the potential for a >performance benefit by avoiding a lot of lock acquire/release in send(). >If >you make a first pass to organize by topic partition and then process each >group, you lock # of partitions times rather than # of messages times. One >major drawback I see is that it seems to make a mess of error >handling/blocking when the RecordAccumulator runs out of space. > >3. @Roshan In the other thread you mentioned 10 byte messages. Is this a >realistic payload size for you? I can imagine applications where it is >(and >we should support those well), it just sounds unusually small. > >4. I reproduced Jay's benchmark blog post awhile ago in an automated test >(see >https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_ >benchmark_test.py). >Here's a snippet from the output on m3.2xlarge instances that might help >shed some light on the situation: >INFO:_.KafkaBenchmark:Message size: >INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s) >INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s) >INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s) >INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s) >INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s) > >That's using the single-threaded new ProducerPerformance class, so the >m3.2xlarge's # of cores probably has little influence. There's clearly a >sharp increase in throughput from 10 -> 100 byte messages. I recall double >checking that the CPU was fully utilized. Note that this is with the >acks=1 >setting that doesn't actually exist anymore, so take with a grain of salt. > >5. I'd suggest that there may be other APIs that give the implementation >more flexibility but still provide batching. For example: >* Require batched inputs to be prepartitioned so each call specifies the >TopicPartition. Main benefit here is that the producer avoids having to do >all the sorting, which the application may already be doing anyway. >* How about an API similar to fwrite() where you provide a set of messages >but it may only write some of them and tells you how many it wrote? This >could be a clean way to expose the underlying batching that is performed >without being a c
Re: New Producer API - batched sync mode support
A couple of thoughts: 1. @Joel I agree it's not hard to use the new API but it definitely is more verbose. If that snippet of code is being written across hundreds of projects, that probably means we're missing an important API. Right now I've only seen the one complaint, but it's worth finding out how many people feel like it's missing. And given that internally each of the returned Futures just uses the future for the entire batch, I think it's probably worth investigating if getting rid of millions of allocs per second is worth it, even if they should be in the nursery and fast to collect. 2. For lots of small messages, there's definitely the potential for a performance benefit by avoiding a lot of lock acquire/release in send(). If you make a first pass to organize by topic partition and then process each group, you lock # of partitions times rather than # of messages times. One major drawback I see is that it seems to make a mess of error handling/blocking when the RecordAccumulator runs out of space. 3. @Roshan In the other thread you mentioned 10 byte messages. Is this a realistic payload size for you? I can imagine applications where it is (and we should support those well), it just sounds unusually small. 4. I reproduced Jay's benchmark blog post awhile ago in an automated test (see https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_benchmark_test.py). Here's a snippet from the output on m3.2xlarge instances that might help shed some light on the situation: INFO:_.KafkaBenchmark:Message size: INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s) INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s) INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s) INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s) INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s) That's using the single-threaded new ProducerPerformance class, so the m3.2xlarge's # of cores probably has little influence. There's clearly a sharp increase in throughput from 10 -> 100 byte messages. I recall double checking that the CPU was fully utilized. Note that this is with the acks=1 setting that doesn't actually exist anymore, so take with a grain of salt. 5. I'd suggest that there may be other APIs that give the implementation more flexibility but still provide batching. For example: * Require batched inputs to be prepartitioned so each call specifies the TopicPartition. Main benefit here is that the producer avoids having to do all the sorting, which the application may already be doing anyway. * How about an API similar to fwrite() where you provide a set of messages but it may only write some of them and tells you how many it wrote? This could be a clean way to expose the underlying batching that is performed without being a completely leaky abstraction. We could then return just a single future for the entire batch, we'd do minimal locking, etc. Not sure how to handle different TopicPartitions in the same set. I think this could be a good pattern for people who want maximally efficient ordered writes where errors are properly handled too. 6. If I recall correctly, doesn't compression occur in a synchronized block, I think in the RecordAccumulator? Or maybe it was in the network thread? In any case, I seem to recall compression also possibly playing an important role in performance because it operates over a set of records which limits where you can run it. @Roshan, are you using compression, both in your microbenchmarks and your application? I think there's almost definitely a good case to be made for a batch API, but probably needs some very clear motivating use cases and perf measurements showing why it's not going to be feasible to accomplish with the current API + a few helpers to wrap it in a batch API. -Ewen On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy wrote: > > > Fine grained tracking of status of individual events is quite painful > in > > contrast to simply blocking on every batch. Old style Batched-sync mode > > has great advantages in terms of simplicity and performance. > > I may be missing something, but I'm not so convinced that it is that > painful/very different from the old-style. > > In the old approach, you would compose a batch (in a list of messages) > and do a synchronous send: > > try { > producer.send(recordsToSend) > } > catch (...) { > // handle (e.g., retry sending recordsToSend) > } > > In the new approach, you would do (something like) this: > > for (record: recordsToSend) { > futureList.add(producer.send(record)); > } > producer.flush(); > for (result: futureList) { > try { result.get(); } > catch (...) { // handle (e.g., retry sending recordsToSend) } > } > > > -- Thanks, Ewen
Re: New Producer API - batched sync mode support
> Fine grained tracking of status of individual events is quite painful in > contrast to simply blocking on every batch. Old style Batched-sync mode > has great advantages in terms of simplicity and performance. I may be missing something, but I'm not so convinced that it is that painful/very different from the old-style. In the old approach, you would compose a batch (in a list of messages) and do a synchronous send: try { producer.send(recordsToSend) } catch (...) { // handle (e.g., retry sending recordsToSend) } In the new approach, you would do (something like) this: for (record: recordsToSend) { futureList.add(producer.send(record)); } producer.flush(); for (result: futureList) { try { result.get(); } catch (...) { // handle (e.g., retry sending recordsToSend) } }
Re: New Producer API - batched sync mode support
On 4/27/15 2:59 PM, "Gwen Shapira" wrote: >@Roshan - if the data was already written to Kafka, your approach will >generate LOTS of duplicates. I'm not convinced its ideal. Only if the delivery failure rate is very high (i.e. short lived but very frequent). This batch semantics is not uncommon also since Kafka had sync-batching in the past, it would not be new to Kafka either. But, like I had mentioned, there is an alternative to mitigate this duplication... Return value could indicate failed messages. Wanted to add.. That these failures typically assume that we got an "error" back from Kafka Broker. There are other modes of failure (like network glitches) which will means you only get some (potentially not very informative) exception. > >What's wrong with callbacks? The complexities of fine grained tracking .. that I described in prev email.
Re: New Producer API - batched sync mode support
@Roshan - if the data was already written to Kafka, your approach will generate LOTS of duplicates. I'm not convinced its ideal. What's wrong with callbacks? On Mon, Apr 27, 2015 at 2:53 PM, Roshan Naik wrote: > @Gwen > > - A failure in delivery of one or more events in the batch (typical Flume > case) is considered a failure of the entire batch and the client > redelivers the entire batch. > - If clients want more fine grained control, alternative option is to > indicate which events failed in the return value of producer.send(list<>) > > > @Joel > Fine grained tracking of status of individual events is quite painful in > contrast to simply blocking on every batch. Old style Batched-sync mode > has great advantages in terms of simplicity and performance. > Imagine a simple use case of client simply reading a directory of log > files and splitting them into log messages/events and pushing them through > kafka. Becomes more complex when all this tracking data needs to be > persisted to accommodate for client restarts/crashes. Tracking a simple > current line number and file name is easy to programming with and persist > to accommodateŠ as opposed start/end position of each log message in the > file. > > > -roshan > > > > > > On 4/27/15 2:07 PM, "Joel Koshy" wrote: > > >As long as you retain the returned futures somewhere, you can always > >iterate over the futures after the flush completes and check for > >success/failure. Would that work for you? > > > >On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote: > >> The important guarantee that is needed for a client producer thread is > >> that it requires an indication of success/failure of the batch of events > >> it pushed. Essentially it needs to retry producer.send() on that same > >> batch in case of failure. My understanding is that flush will simply > >>flush > >> data from all threads (correct me if I am wrong). > >> > >> -roshan > >> > >> > >> > >> On 4/27/15 1:36 PM, "Joel Koshy" wrote: > >> > >> >This sounds like flush: > >> > >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me > >>>th > >> >od+to+the+producer+API > >> > > >> >which was recently implemented in trunk. > >> > > >> >Joel > >> > > >> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > >> >> Been evaluating the perf of old and new Produce APIs for reliable > >>high > >> >>volume streaming data movement. I do see one area of improvement that > >> >>the new API could use for synchronous clients. > >> >> > >> >> AFAIKT, the new API does not support batched synchronous transfers. > >>To > >> >>do synchronous send, one needs to do a future.get() after every > >> >>Producer.send(). I changed the new > >> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this > >> >>mode of operation. May not be surprising that it much slower than the > >> >>async mode... hard t push it beyond 4MB/s. > >> >> > >> >> The 0.8.1 Scala based producer API supported a batched sync mode via > >> >>Producer.send( List ) . My measurements show that it was > >> >>able to approach (and sometimes exceed) the old async speeds... > >>266MB/s > >> >> > >> >> > >> >> Supporting this batched sync mode is very critical for streaming > >> >>clients (such as flume for example) that need delivery guarantees. > >> >>Although it can be done with Async mode, it requires additional book > >> >>keeping as to which events are delivered and which ones are not. The > >> >>programming model becomes much simpler with the batched sync mode. > >> >>Client having to deal with one single future.get() helps performance > >> >>greatly too as I noted. > >> >> > >> >> Wanted to propose adding this as an enhancement to the new Producer > >>API. > >> > > >> > > > >
Re: New Producer API - batched sync mode support
I should have been clearer - I used Roshan's terminology in my reply. Basically, the old producer "batch" Send() just took a sequence of messages. I assumed Roshan is looking for something similar - which allows for mixing messages for multiple partitions and therefore can fail for some messages and succeed for others. This is unrelated for MessageSet, which is for a specific partition and indeed fails or succeeds as a whole. For completeness - the internal RecordAccumulator component of the new KafkaProducer does manage a separate batch for each partition, and these batches should succeed or fail as a whole. I'm not sure I want to expose this level of implementation detail in our API though. Gwen On Mon, Apr 27, 2015 at 2:36 PM, Magnus Edenhill wrote: > Hi Gwen, > > can you clarify: by batch do you mean the protocol MessageSet, or some java > client internal construct? > If the former I was under the impression that a produced MessageSet either > succeeds delivery or errors in its entirety on the broker. > > Thanks, > Magnus > > > 2015-04-27 23:05 GMT+02:00 Gwen Shapira : > > > Batch failure is a bit meaningless, since in the same batch, some records > > can succeed and others may fail. > > To implement an error handling logic (usually different than retry, since > > the producer has a configuration controlling retries), we recommend using > > the callback option of Send(). > > > > Gwen > > > > P.S > > Awesome seeing you here, Roshan :) > > > > On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik > > wrote: > > > > > The important guarantee that is needed for a client producer thread is > > > that it requires an indication of success/failure of the batch of > events > > > it pushed. Essentially it needs to retry producer.send() on that same > > > batch in case of failure. My understanding is that flush will simply > > flush > > > data from all threads (correct me if I am wrong). > > > > > > -roshan > > > > > > > > > > > > On 4/27/15 1:36 PM, "Joel Koshy" wrote: > > > > > > >This sounds like flush: > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth > > > >od+to+the+producer+API > > > > > > > >which was recently implemented in trunk. > > > > > > > >Joel > > > > > > > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > > > >> Been evaluating the perf of old and new Produce APIs for reliable > high > > > >>volume streaming data movement. I do see one area of improvement that > > > >>the new API could use for synchronous clients. > > > >> > > > >> AFAIKT, the new API does not support batched synchronous transfers. > To > > > >>do synchronous send, one needs to do a future.get() after every > > > >>Producer.send(). I changed the new > > > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of > this > > > >>mode of operation. May not be surprising that it much slower than the > > > >>async mode... hard t push it beyond 4MB/s. > > > >> > > > >> The 0.8.1 Scala based producer API supported a batched sync mode via > > > >>Producer.send( List ) . My measurements show that it > was > > > >>able to approach (and sometimes exceed) the old async speeds... > 266MB/s > > > >> > > > >> > > > >> Supporting this batched sync mode is very critical for streaming > > > >>clients (such as flume for example) that need delivery guarantees. > > > >>Although it can be done with Async mode, it requires additional book > > > >>keeping as to which events are delivered and which ones are not. The > > > >>programming model becomes much simpler with the batched sync mode. > > > >>Client having to deal with one single future.get() helps performance > > > >>greatly too as I noted. > > > >> > > > >> Wanted to propose adding this as an enhancement to the new Producer > > API. > > > > > > > > > > > > >
Re: New Producer API - batched sync mode support
@Gwen - A failure in delivery of one or more events in the batch (typical Flume case) is considered a failure of the entire batch and the client redelivers the entire batch. - If clients want more fine grained control, alternative option is to indicate which events failed in the return value of producer.send(list<>) @Joel Fine grained tracking of status of individual events is quite painful in contrast to simply blocking on every batch. Old style Batched-sync mode has great advantages in terms of simplicity and performance. Imagine a simple use case of client simply reading a directory of log files and splitting them into log messages/events and pushing them through kafka. Becomes more complex when all this tracking data needs to be persisted to accommodate for client restarts/crashes. Tracking a simple current line number and file name is easy to programming with and persist to accommodateŠ as opposed start/end position of each log message in the file. -roshan On 4/27/15 2:07 PM, "Joel Koshy" wrote: >As long as you retain the returned futures somewhere, you can always >iterate over the futures after the flush completes and check for >success/failure. Would that work for you? > >On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote: >> The important guarantee that is needed for a client producer thread is >> that it requires an indication of success/failure of the batch of events >> it pushed. Essentially it needs to retry producer.send() on that same >> batch in case of failure. My understanding is that flush will simply >>flush >> data from all threads (correct me if I am wrong). >> >> -roshan >> >> >> >> On 4/27/15 1:36 PM, "Joel Koshy" wrote: >> >> >This sounds like flush: >> >>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me >>>th >> >od+to+the+producer+API >> > >> >which was recently implemented in trunk. >> > >> >Joel >> > >> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: >> >> Been evaluating the perf of old and new Produce APIs for reliable >>high >> >>volume streaming data movement. I do see one area of improvement that >> >>the new API could use for synchronous clients. >> >> >> >> AFAIKT, the new API does not support batched synchronous transfers. >>To >> >>do synchronous send, one needs to do a future.get() after every >> >>Producer.send(). I changed the new >> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this >> >>mode of operation. May not be surprising that it much slower than the >> >>async mode... hard t push it beyond 4MB/s. >> >> >> >> The 0.8.1 Scala based producer API supported a batched sync mode via >> >>Producer.send( List ) . My measurements show that it was >> >>able to approach (and sometimes exceed) the old async speeds... >>266MB/s >> >> >> >> >> >> Supporting this batched sync mode is very critical for streaming >> >>clients (such as flume for example) that need delivery guarantees. >> >>Although it can be done with Async mode, it requires additional book >> >>keeping as to which events are delivered and which ones are not. The >> >>programming model becomes much simpler with the batched sync mode. >> >>Client having to deal with one single future.get() helps performance >> >>greatly too as I noted. >> >> >> >> Wanted to propose adding this as an enhancement to the new Producer >>API. >> > >> >
Re: New Producer API - batched sync mode support
Hi Gwen, can you clarify: by batch do you mean the protocol MessageSet, or some java client internal construct? If the former I was under the impression that a produced MessageSet either succeeds delivery or errors in its entirety on the broker. Thanks, Magnus 2015-04-27 23:05 GMT+02:00 Gwen Shapira : > Batch failure is a bit meaningless, since in the same batch, some records > can succeed and others may fail. > To implement an error handling logic (usually different than retry, since > the producer has a configuration controlling retries), we recommend using > the callback option of Send(). > > Gwen > > P.S > Awesome seeing you here, Roshan :) > > On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik > wrote: > > > The important guarantee that is needed for a client producer thread is > > that it requires an indication of success/failure of the batch of events > > it pushed. Essentially it needs to retry producer.send() on that same > > batch in case of failure. My understanding is that flush will simply > flush > > data from all threads (correct me if I am wrong). > > > > -roshan > > > > > > > > On 4/27/15 1:36 PM, "Joel Koshy" wrote: > > > > >This sounds like flush: > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth > > >od+to+the+producer+API > > > > > >which was recently implemented in trunk. > > > > > >Joel > > > > > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > > >> Been evaluating the perf of old and new Produce APIs for reliable high > > >>volume streaming data movement. I do see one area of improvement that > > >>the new API could use for synchronous clients. > > >> > > >> AFAIKT, the new API does not support batched synchronous transfers. To > > >>do synchronous send, one needs to do a future.get() after every > > >>Producer.send(). I changed the new > > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this > > >>mode of operation. May not be surprising that it much slower than the > > >>async mode... hard t push it beyond 4MB/s. > > >> > > >> The 0.8.1 Scala based producer API supported a batched sync mode via > > >>Producer.send( List ) . My measurements show that it was > > >>able to approach (and sometimes exceed) the old async speeds... 266MB/s > > >> > > >> > > >> Supporting this batched sync mode is very critical for streaming > > >>clients (such as flume for example) that need delivery guarantees. > > >>Although it can be done with Async mode, it requires additional book > > >>keeping as to which events are delivered and which ones are not. The > > >>programming model becomes much simpler with the batched sync mode. > > >>Client having to deal with one single future.get() helps performance > > >>greatly too as I noted. > > >> > > >> Wanted to propose adding this as an enhancement to the new Producer > API. > > > > > > > >
Re: New Producer API - batched sync mode support
As long as you retain the returned futures somewhere, you can always iterate over the futures after the flush completes and check for success/failure. Would that work for you? On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote: > The important guarantee that is needed for a client producer thread is > that it requires an indication of success/failure of the batch of events > it pushed. Essentially it needs to retry producer.send() on that same > batch in case of failure. My understanding is that flush will simply flush > data from all threads (correct me if I am wrong). > > -roshan > > > > On 4/27/15 1:36 PM, "Joel Koshy" wrote: > > >This sounds like flush: > >https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth > >od+to+the+producer+API > > > >which was recently implemented in trunk. > > > >Joel > > > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > >> Been evaluating the perf of old and new Produce APIs for reliable high > >>volume streaming data movement. I do see one area of improvement that > >>the new API could use for synchronous clients. > >> > >> AFAIKT, the new API does not support batched synchronous transfers. To > >>do synchronous send, one needs to do a future.get() after every > >>Producer.send(). I changed the new > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this > >>mode of operation. May not be surprising that it much slower than the > >>async mode... hard t push it beyond 4MB/s. > >> > >> The 0.8.1 Scala based producer API supported a batched sync mode via > >>Producer.send( List ) . My measurements show that it was > >>able to approach (and sometimes exceed) the old async speeds... 266MB/s > >> > >> > >> Supporting this batched sync mode is very critical for streaming > >>clients (such as flume for example) that need delivery guarantees. > >>Although it can be done with Async mode, it requires additional book > >>keeping as to which events are delivered and which ones are not. The > >>programming model becomes much simpler with the batched sync mode. > >>Client having to deal with one single future.get() helps performance > >>greatly too as I noted. > >> > >> Wanted to propose adding this as an enhancement to the new Producer API. > > >
Re: New Producer API - batched sync mode support
Batch failure is a bit meaningless, since in the same batch, some records can succeed and others may fail. To implement an error handling logic (usually different than retry, since the producer has a configuration controlling retries), we recommend using the callback option of Send(). Gwen P.S Awesome seeing you here, Roshan :) On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik wrote: > The important guarantee that is needed for a client producer thread is > that it requires an indication of success/failure of the batch of events > it pushed. Essentially it needs to retry producer.send() on that same > batch in case of failure. My understanding is that flush will simply flush > data from all threads (correct me if I am wrong). > > -roshan > > > > On 4/27/15 1:36 PM, "Joel Koshy" wrote: > > >This sounds like flush: > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth > >od+to+the+producer+API > > > >which was recently implemented in trunk. > > > >Joel > > > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > >> Been evaluating the perf of old and new Produce APIs for reliable high > >>volume streaming data movement. I do see one area of improvement that > >>the new API could use for synchronous clients. > >> > >> AFAIKT, the new API does not support batched synchronous transfers. To > >>do synchronous send, one needs to do a future.get() after every > >>Producer.send(). I changed the new > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this > >>mode of operation. May not be surprising that it much slower than the > >>async mode... hard t push it beyond 4MB/s. > >> > >> The 0.8.1 Scala based producer API supported a batched sync mode via > >>Producer.send( List ) . My measurements show that it was > >>able to approach (and sometimes exceed) the old async speeds... 266MB/s > >> > >> > >> Supporting this batched sync mode is very critical for streaming > >>clients (such as flume for example) that need delivery guarantees. > >>Although it can be done with Async mode, it requires additional book > >>keeping as to which events are delivered and which ones are not. The > >>programming model becomes much simpler with the batched sync mode. > >>Client having to deal with one single future.get() helps performance > >>greatly too as I noted. > >> > >> Wanted to propose adding this as an enhancement to the new Producer API. > > > >
Re: New Producer API - batched sync mode support
The important guarantee that is needed for a client producer thread is that it requires an indication of success/failure of the batch of events it pushed. Essentially it needs to retry producer.send() on that same batch in case of failure. My understanding is that flush will simply flush data from all threads (correct me if I am wrong). -roshan On 4/27/15 1:36 PM, "Joel Koshy" wrote: >This sounds like flush: >https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth >od+to+the+producer+API > >which was recently implemented in trunk. > >Joel > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: >> Been evaluating the perf of old and new Produce APIs for reliable high >>volume streaming data movement. I do see one area of improvement that >>the new API could use for synchronous clients. >> >> AFAIKT, the new API does not support batched synchronous transfers. To >>do synchronous send, one needs to do a future.get() after every >>Producer.send(). I changed the new >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this >>mode of operation. May not be surprising that it much slower than the >>async mode... hard t push it beyond 4MB/s. >> >> The 0.8.1 Scala based producer API supported a batched sync mode via >>Producer.send( List ) . My measurements show that it was >>able to approach (and sometimes exceed) the old async speeds... 266MB/s >> >> >> Supporting this batched sync mode is very critical for streaming >>clients (such as flume for example) that need delivery guarantees. >>Although it can be done with Async mode, it requires additional book >>keeping as to which events are delivered and which ones are not. The >>programming model becomes much simpler with the batched sync mode. >>Client having to deal with one single future.get() helps performance >>greatly too as I noted. >> >> Wanted to propose adding this as an enhancement to the new Producer API. >
Re: New Producer API - batched sync mode support
This sounds like flush: https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API which was recently implemented in trunk. Joel On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote: > Been evaluating the perf of old and new Produce APIs for reliable high volume > streaming data movement. I do see one area of improvement that the new API > could use for synchronous clients. > > AFAIKT, the new API does not support batched synchronous transfers. To do > synchronous send, one needs to do a future.get() after every Producer.send(). > I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the > perf of this mode of operation. May not be surprising that it much slower > than the async mode... hard t push it beyond 4MB/s. > > The 0.8.1 Scala based producer API supported a batched sync mode via > Producer.send( List ) . My measurements show that it was able > to approach (and sometimes exceed) the old async speeds... 266MB/s > > > Supporting this batched sync mode is very critical for streaming clients > (such as flume for example) that need delivery guarantees. Although it can be > done with Async mode, it requires additional book keeping as to which events > are delivered and which ones are not. The programming model becomes much > simpler with the batched sync mode. Client having to deal with one single > future.get() helps performance greatly too as I noted. > > Wanted to propose adding this as an enhancement to the new Producer API.
New Producer API - batched sync mode support
Been evaluating the perf of old and new Produce APIs for reliable high volume streaming data movement. I do see one area of improvement that the new API could use for synchronous clients. AFAIKT, the new API does not support batched synchronous transfers. To do synchronous send, one needs to do a future.get() after every Producer.send(). I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this mode of operation. May not be surprising that it much slower than the async mode... hard t push it beyond 4MB/s. The 0.8.1 Scala based producer API supported a batched sync mode via Producer.send( List ) . My measurements show that it was able to approach (and sometimes exceed) the old async speeds... 266MB/s Supporting this batched sync mode is very critical for streaming clients (such as flume for example) that need delivery guarantees. Although it can be done with Async mode, it requires additional book keeping as to which events are delivered and which ones are not. The programming model becomes much simpler with the batched sync mode. Client having to deal with one single future.get() helps performance greatly too as I noted. Wanted to propose adding this as an enhancement to the new Producer API.
Re: Batching with new Producer API
Oh, that makes a lot more sense! I assumed that the batch size was in terms of number of messages, not the number of bytes because it was so small. What would be a reasonable value to use? Will 1-2 MB be too large and bursty? On Thu, Feb 26, 2015 at 10:07 AM, Harsha wrote: > Akshat, >Produce.batch_size is in bytes and if your messages avg size is >310 bytes and your current number of messages per batch is 46 you >are getting close to the max batch size 16384. Did you try >increasing the producer batch_size bytes? > -Harsha > > On Thu, Feb 26, 2015, at 09:49 AM, Akshat Aranya wrote: > > Hi, > > > > I am using the new Producer API in Kafka 0.8.2. I am writing messages to > > Kafka that are ~310 bytes long with the same partition key to one single > > . > > I'm mostly using the default Producer config, which sets the max batch > > size > > to 16,384. However, looking at the JMX stats on the broker side, I see > > that I'm only getting an average batch size of 46. I also tried > > increasing > > the linger.ms value to 100ms (default is 0), but that didn't help > either. > > Is there something else that I can tune that will improve write batching? > > > > Thanks, > > Akshat >
Re: Batching with new Producer API
Akshat, Produce.batch_size is in bytes and if your messages avg size is 310 bytes and your current number of messages per batch is 46 you are getting close to the max batch size 16384. Did you try increasing the producer batch_size bytes? -Harsha On Thu, Feb 26, 2015, at 09:49 AM, Akshat Aranya wrote: > Hi, > > I am using the new Producer API in Kafka 0.8.2. I am writing messages to > Kafka that are ~310 bytes long with the same partition key to one single > . > I'm mostly using the default Producer config, which sets the max batch > size > to 16,384. However, looking at the JMX stats on the broker side, I see > that I'm only getting an average batch size of 46. I also tried > increasing > the linger.ms value to 100ms (default is 0), but that didn't help either. > Is there something else that I can tune that will improve write batching? > > Thanks, > Akshat
Batching with new Producer API
Hi, I am using the new Producer API in Kafka 0.8.2. I am writing messages to Kafka that are ~310 bytes long with the same partition key to one single . I'm mostly using the default Producer config, which sets the max batch size to 16,384. However, looking at the JMX stats on the broker side, I see that I'm only getting an average batch size of 46. I also tried increasing the linger.ms value to 100ms (default is 0), but that didn't help either. Is there something else that I can tune that will improve write batching? Thanks, Akshat
Re: new producer api and batched Futures....
I guess it would make the api less clean, but I can imagine a sendBatch method, which returns a single Future that gets triggered only when all messages in the batch were finished. The callback info could then contain info about the success/exceptions encountered by each sub-group of messages. And the callback could even be called multiple times, once for each sub-batch sent. It gets complicated to think about it, but it would be fewer Future objects created and less async contention/waiting, etc. I'll try it out and see Jason On Thu, Nov 20, 2014 at 7:56 PM, Jay Kreps wrote: > Internally it works as you describe, there is only one CountDownLatch per > batch sent, each of the futures is just a wrapper around that. > > It is true that if you accumulate thousands of futures in a list that may > be a fair number of objects you are retaining, and there will be some work > involved in checking them all. If you are sure they are all going to the > same partition you can actually wait on the last future since sends are > ordered within a partition. So when the final send completes the prior > sends should also have completed. > > Either way if you see a case where the new producer isn't as fast as the > old producer let us know. > > -Jay > > > > On Thu, Nov 20, 2014 at 4:24 PM, Jason Rosenberg wrote: > > > I've been looking at the new producer api with anticipation, but have not > > fired it up yet. > > > > One question I have, is it looks like there's no longer a 'batch' send > mode > > (and I get that this is all now handled internally, e.g. you send > > individual messages, that then get collated and batched up and sent out). > > > > What I'm wondering, is whether there's added overhead in the producer > (and > > the client code) having to manage all the Future return Objects from all > > the individual messages sent? If I'm sending 100K messages/second, etc., > > that seems like a lot of async Future Objects that have to be tickled, > and > > waited for, etc. Does not this cause some overhead? > > > > If I send a bunch of messages and then store all the Future's in a list, > > and then wait for all of them, it seems like a lot of thread contention. > > On the other hand, if I send a batch of messages, that are likely all to > > get sent as a single batch over the wire (cuz they are all going to the > > same partition), wouldn't there be some benefit in only having to wait > for > > a single Future Object for the batch? > > > > Jason > > >
Re: new producer api and batched Futures....
Internally it works as you describe, there is only one CountDownLatch per batch sent, each of the futures is just a wrapper around that. It is true that if you accumulate thousands of futures in a list that may be a fair number of objects you are retaining, and there will be some work involved in checking them all. If you are sure they are all going to the same partition you can actually wait on the last future since sends are ordered within a partition. So when the final send completes the prior sends should also have completed. Either way if you see a case where the new producer isn't as fast as the old producer let us know. -Jay On Thu, Nov 20, 2014 at 4:24 PM, Jason Rosenberg wrote: > I've been looking at the new producer api with anticipation, but have not > fired it up yet. > > One question I have, is it looks like there's no longer a 'batch' send mode > (and I get that this is all now handled internally, e.g. you send > individual messages, that then get collated and batched up and sent out). > > What I'm wondering, is whether there's added overhead in the producer (and > the client code) having to manage all the Future return Objects from all > the individual messages sent? If I'm sending 100K messages/second, etc., > that seems like a lot of async Future Objects that have to be tickled, and > waited for, etc. Does not this cause some overhead? > > If I send a bunch of messages and then store all the Future's in a list, > and then wait for all of them, it seems like a lot of thread contention. > On the other hand, if I send a batch of messages, that are likely all to > get sent as a single batch over the wire (cuz they are all going to the > same partition), wouldn't there be some benefit in only having to wait for > a single Future Object for the batch? > > Jason >
new producer api and batched Futures....
I've been looking at the new producer api with anticipation, but have not fired it up yet. One question I have, is it looks like there's no longer a 'batch' send mode (and I get that this is all now handled internally, e.g. you send individual messages, that then get collated and batched up and sent out). What I'm wondering, is whether there's added overhead in the producer (and the client code) having to manage all the Future return Objects from all the individual messages sent? If I'm sending 100K messages/second, etc., that seems like a lot of async Future Objects that have to be tickled, and waited for, etc. Does not this cause some overhead? If I send a bunch of messages and then store all the Future's in a list, and then wait for all of them, it seems like a lot of thread contention. On the other hand, if I send a batch of messages, that are likely all to get sent as a single batch over the wire (cuz they are all going to the same partition), wouldn't there be some benefit in only having to wait for a single Future Object for the batch? Jason
Re: how to know kafka producer api status
Seems email archive doesn't work now in http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/browser And also I sent email to users-subscr...@kafka.apache.orgto subscribe this group, it doesn't work either. 2014-05-09 16:00 GMT+08:00 Yonghui Zhao : > > If l use java producer api in sync mode. > > public void send(kafka.producer.KeyedMessage message) { /* compiled > code */ } > > How to know whether a send process is successful or failed? > > For example if the kafka broker disk is not accessible , will it throw > exceptions? > >
Re: how to know kafka producer api status
Hi Yonghui, If you set producer.type = sync, then the send() call will not return until it have received the ack from the broker, and if the response contains any error code it will re-try send until all retries exhausted, and then will throw an exception. Guozhang On Fri, May 9, 2014 at 4:18 AM, Yonghui Zhao wrote: > Seems email archive doesn't work now in > http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/browser > > And also I sent email to users-subscr...@kafka.apache.orgto subscribe > this group, it doesn't work either. > > > 2014-05-09 16:00 GMT+08:00 Yonghui Zhao : > > > > > If l use java producer api in sync mode. > > > > public void send(kafka.producer.KeyedMessage message) { /* > compiled > > code */ } > > > > How to know whether a send process is successful or failed? > > > > For example if the kafka broker disk is not accessible , will it throw > > exceptions? > > > > > -- -- Guozhang
Re: how to know kafka producer api status
It typically throws a exception in the end of the sync producer cannot deliver your message. In the case where there is a IOException or similiar exceptions that the Broker cannot deal with, I believe it will try to return UnknownError response which will then throw in the producer. In cases where it receives error codes where the producer can recover from (ie: NotLeaderForPartition), it simply retries up to the configured max retries. Tim On Fri, May 9, 2014 at 1:00 AM, Yonghui Zhao wrote: > If l use java producer api in sync mode. > > public void send(kafka.producer.KeyedMessage message) { /* compiled > code */ } > > How to know whether a send process is successful or failed? > > For example if the kafka broker disk is not accessible , will it throw > exceptions?
Re: how to know kafka producer api status
Yes, in sync mode, if send() fails, an exception will be thrown. Thanks, Jun On Fri, May 9, 2014 at 1:00 AM, Yonghui Zhao wrote: > If l use java producer api in sync mode. > > public void send(kafka.producer.KeyedMessage message) { /* compiled > code */ } > > How to know whether a send process is successful or failed? > > For example if the kafka broker disk is not accessible , will it throw > exceptions? >
how to know kafka producer api status
If l use java producer api in sync mode. public void send(kafka.producer.KeyedMessage message) { /* compiled code */ } How to know whether a send process is successful or failed? For example if the kafka broker disk is not accessible , will it throw exceptions?
Re: Re: why kafka producer api use cpu so high?
If a process is CPU bound (which this producer almost certainly will be), it's going to consume as much CPU as it can to do what what it does. The test is flawed. Because there's no end state, the while loop is just going to burn CPU and, because it's singly threaded, it will take a single core. A better test is to find out a rough number of events per second your process needs to produce and write the testing accordingly. That will tell you, when producing ~50MB/sec worth of events, this is how much the producer will chew up. The other thing worth pointing out is that sending a single event at a time comes with a fair bit of overhead which, in turn, naturally drives up CPU time. If you use the list form of send() you're going to be amortize the cost of the RPC and other internal bits leading to more efficient use of system resources. Again, it may still burn a full core because what you're doing is CPU bound, but it will do more during that time. On Sun, May 11, 2014 at 1:04 AM, wrote: > I use snappy for compression. > but even without compression, this procedure also use 50% one core cpu. > > when using snappy ,this procedure use 100% one core cpu. > > > > > > From: Timothy Chen > Date: 2014-05-11 15:53 > To: users@kafka.apache.org > Subject: Re: why kafka producer api use cpu so high? > What is your compression configuration for your producer? > > One of the biggest CPU source for the producer is doing compression > and also checksuming. > > Tim > > On Sun, May 11, 2014 at 12:24 AM, wrote: > > I write a very simple code , like this : > > public class LogProducer { > > > > private Producer inner; > > public LogProducer() throws Exception{ > > Properties properties = new Properties(); > > > properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); > > ProducerConfig config = new ProducerConfig(properties); > > inner = new Producer(config); > > } > > > > > > public void send(String topicName,String message) { > > if(topicName == null || message == null){ > > return; > > } > > KeyedMessage km = new KeyedMessage String>(topicName,message); > > inner.send(km); > > } > > public void close(){ > > inner.close(); > > } > > > > /** > > * @param args > > */ > > public static void main(String[] args) { > > LogProducer producer = null; > > try{ > > producer = new LogProducer(); > > int i=0; > > while(true){ > > producer.send("test", "this is a > sample"); > > } > > }catch(Exception e){ > > e.printStackTrace(); > > }finally{ > > if(producer != null){ > > producer.close(); > > } > > } > > > > } > > > > } > > ~~ > > and the producer.properties like this: > > metadata.broker.list=127.0.0.1:9092 > > producer.type=async > > serializer.class=kafka.serializer.StringEncoder > > batch.num.messages=200 > > compression.codec=snappy > > > > I run this procedure on linux, which is 4 core cpu , 16GB memory. > > I find this procedure using one core cpu totally , this is "top" command > ouput: > > > > > > [root@localhost ~]# top > > top - 13:51:09 up 5 days, 13:27, 3 users, load average: 0.96, 0.48, > 0.35 > > Tasks: 367 total, 3 running, 364 sleeping, 0 stopped, 0 zombie > > Cpu0 : 7.0%us, 0.3%sy, 0.0%ni, 92.0%id, 0.7%wa, 0.0%hi, 0.0%si, > 0.0%st > > Cpu1 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, > 0.0%st > > Cpu2 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, > 0.0%st > > Cpu3 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, > 0.0%st > > Mem: 16307528k total, 9398376k used, 6909152k free, 249952k buffers > > Swap: 8224760k total,0k used, 8224760k free, 6071348k cached > > > > why producer api use cpu so high ? or maybe I make something wrong ? > > > > by the way , the kafka version 0.8.0 . > -- E. Sammer CTO - ScalingData
Re: Re: why kafka producer api use cpu so high?
I use snappy for compression. but even without compression, this procedure also use 50% one core cpu. when using snappy ,this procedure use 100% one core cpu. From: Timothy Chen Date: 2014-05-11 15:53 To: users@kafka.apache.org Subject: Re: why kafka producer api use cpu so high? What is your compression configuration for your producer? One of the biggest CPU source for the producer is doing compression and also checksuming. Tim On Sun, May 11, 2014 at 12:24 AM, wrote: > I write a very simple code , like this : > public class LogProducer { > > private Producer inner; > public LogProducer() throws Exception{ > Properties properties = new Properties(); > > properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); > ProducerConfig config = new ProducerConfig(properties); > inner = new Producer(config); > } > > > public void send(String topicName,String message) { > if(topicName == null || message == null){ > return; > } > KeyedMessage km = new KeyedMessage String>(topicName,message); > inner.send(km); > } > public void close(){ > inner.close(); > } > > /** > * @param args > */ > public static void main(String[] args) { > LogProducer producer = null; > try{ > producer = new LogProducer(); > int i=0; > while(true){ > producer.send("test", "this is a > sample"); > } > }catch(Exception e){ > e.printStackTrace(); > }finally{ > if(producer != null){ > producer.close(); > } > } > > } > > } > ~~ > and the producer.properties like this: > metadata.broker.list=127.0.0.1:9092 > producer.type=async > serializer.class=kafka.serializer.StringEncoder > batch.num.messages=200 > compression.codec=snappy > > I run this procedure on linux, which is 4 core cpu , 16GB memory. > I find this procedure using one core cpu totally , this is "top" command > ouput: > > > [root@localhost ~]# top > top - 13:51:09 up 5 days, 13:27, 3 users, load average: 0.96, 0.48, 0.35 > Tasks: 367 total, 3 running, 364 sleeping, 0 stopped, 0 zombie > Cpu0 : 7.0%us, 0.3%sy, 0.0%ni, 92.0%id, 0.7%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu1 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu2 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu3 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Mem: 16307528k total, 9398376k used, 6909152k free, 249952k buffers > Swap: 8224760k total,0k used, 8224760k free, 6071348k cached > > why producer api use cpu so high ? or maybe I make something wrong ? > > by the way , the kafka version 0.8.0 .
Re: Re: why kafka producer api use cpu so high?
because my app can generate 50MB log every second and one record of log is about 1KB , so I must send this log as fast as machine can. this is very difficult, on one hand I want to send log as fast as possible, on the other hand I want kafka producer api use cpu as low as possible. if kafka api using cpu so high , it will impact my app. so can kafka solve this problem ? send 50MB log to kafka server every second ,and using low cpu. From: cac...@gmail.com Date: 2014-05-11 16:52 To: users Subject: Re: why kafka producer api use cpu so high? This code says to send this message infinitely as fast as the machine can thereby consuming as much of one CPU as possible. You may want to consider an alternate test, perhaps one that records the number of messages sent in a given time period. > > public static void main(String[] args) { > > LogProducer producer = null; > > try{ > > producer = new LogProducer(); > > int i=0; > > while(true){ > > producer.send("test", "this is a > sample"); > > } > > }catch(Exception e){ > > e.printStackTrace(); > > }finally{ > > if(producer != null){ > > producer.close(); > > } > > } > > > > } > > > > } > >
Re: why kafka producer api use cpu so high?
This code says to send this message infinitely as fast as the machine can thereby consuming as much of one CPU as possible. You may want to consider an alternate test, perhaps one that records the number of messages sent in a given time period. > > public static void main(String[] args) { > > LogProducer producer = null; > > try{ > > producer = new LogProducer(); > > int i=0; > > while(true){ > > producer.send("test", "this is a > sample"); > > } > > }catch(Exception e){ > > e.printStackTrace(); > > }finally{ > > if(producer != null){ > > producer.close(); > > } > > } > > > > } > > > > } > >
Re: why kafka producer api use cpu so high?
What is your compression configuration for your producer? One of the biggest CPU source for the producer is doing compression and also checksuming. Tim On Sun, May 11, 2014 at 12:24 AM, wrote: > I write a very simple code , like this : > public class LogProducer { > > private Producer inner; > public LogProducer() throws Exception{ > Properties properties = new Properties(); > > properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); > ProducerConfig config = new ProducerConfig(properties); > inner = new Producer(config); > } > > > public void send(String topicName,String message) { > if(topicName == null || message == null){ > return; > } > KeyedMessage km = new KeyedMessage String>(topicName,message); > inner.send(km); > } > public void close(){ > inner.close(); > } > > /** > * @param args > */ > public static void main(String[] args) { > LogProducer producer = null; > try{ > producer = new LogProducer(); > int i=0; > while(true){ > producer.send("test", "this is a > sample"); > } > }catch(Exception e){ > e.printStackTrace(); > }finally{ > if(producer != null){ > producer.close(); > } > } > > } > > } > ~~ > and the producer.properties like this: > metadata.broker.list=127.0.0.1:9092 > producer.type=async > serializer.class=kafka.serializer.StringEncoder > batch.num.messages=200 > compression.codec=snappy > > I run this procedure on linux, which is 4 core cpu , 16GB memory. > I find this procedure using one core cpu totally , this is "top" command > ouput: > > > [root@localhost ~]# top > top - 13:51:09 up 5 days, 13:27, 3 users, load average: 0.96, 0.48, 0.35 > Tasks: 367 total, 3 running, 364 sleeping, 0 stopped, 0 zombie > Cpu0 : 7.0%us, 0.3%sy, 0.0%ni, 92.0%id, 0.7%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu1 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu2 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Cpu3 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st > Mem: 16307528k total, 9398376k used, 6909152k free, 249952k buffers > Swap: 8224760k total,0k used, 8224760k free, 6071348k cached > > why producer api use cpu so high ? or maybe I make something wrong ? > > by the way , the kafka version 0.8.0 .
why kafka producer api use cpu so high?
I write a very simple code , like this : public class LogProducer { private Producer inner; public LogProducer() throws Exception{ Properties properties = new Properties(); properties.load(ClassLoader.getSystemResourceAsStream("producer.properties")); ProducerConfig config = new ProducerConfig(properties); inner = new Producer(config); } public void send(String topicName,String message) { if(topicName == null || message == null){ return; } KeyedMessage km = new KeyedMessage(topicName,message); inner.send(km); } public void close(){ inner.close(); } /** * @param args */ public static void main(String[] args) { LogProducer producer = null; try{ producer = new LogProducer(); int i=0; while(true){ producer.send("test", "this is a sample"); } }catch(Exception e){ e.printStackTrace(); }finally{ if(producer != null){ producer.close(); } } } } ~~ and the producer.properties like this: metadata.broker.list=127.0.0.1:9092 producer.type=async serializer.class=kafka.serializer.StringEncoder batch.num.messages=200 compression.codec=snappy I run this procedure on linux, which is 4 core cpu , 16GB memory. I find this procedure using one core cpu totally , this is "top" command ouput: [root@localhost ~]# top top - 13:51:09 up 5 days, 13:27, 3 users, load average: 0.96, 0.48, 0.35 Tasks: 367 total, 3 running, 364 sleeping, 0 stopped, 0 zombie Cpu0 : 7.0%us, 0.3%sy, 0.0%ni, 92.0%id, 0.7%wa, 0.0%hi, 0.0%si, 0.0%st Cpu1 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu2 : 5.0%us, 0.0%sy, 0.0%ni, 95.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Cpu3 : 99.7%us, 0.3%sy, 0.0%ni, 0.0%id, 0.0%wa, 0.0%hi, 0.0%si, 0.0%st Mem: 16307528k total, 9398376k used, 6909152k free, 249952k buffers Swap: 8224760k total,0k used, 8224760k free, 6071348k cached why producer api use cpu so high ? or maybe I make something wrong ? by the way , the kafka version 0.8.0 .
Re: 0.8.1 Java Producer API Callbacks
This summary is correct. If you are just starting development now it is probably reasonable to start with the new producer. We would certainly appreciate any feedback on it. My recommendation would be to build the producer off trunk as there were a few bug fixes since 0.8.1.x that are worth getting. -Jay On Fri, May 2, 2014 at 9:49 AM, Neha Narkhede wrote: > Hi Christian, > > As Jun mentioned, the new producer is marked beta since it is new. By 0.9 > or even sooner, we'd expect it to be deployed at LinkedIn and a few other > companies in a stable state. The APIs, guarantees and features will not > change by 0.9, just the stability. > > Thanks, > Neha > > > On Fri, May 2, 2014 at 7:50 AM, Jun Rao wrote: > > > It's beta mostly because it's new. > > > > Thanks, > > > > Jun > > > > > > On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com > > wrote: > > > > > Thanks, that's quite helpful. According to this post, > > > http://blog.empathybox.com/ , it looks like it will be beta then which > > > seems good enough. Assuming that the beta designation is correct, is > that > > > because it won't have as many features/the same flexibility as it's > > > expected to by 0.9? > > > Christian > > > > > > > > > On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede > > >wrote: > > > > > > > The javadoc of the new producer is here for now - > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > > > Thanks, > > > > Neha > > > > > > > > > > > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao wrote: > > > > > > > > > The new producer (that supports callbacks) is in trunk. It will be > > > > released > > > > > in 0.8.2. You can look at the java doc of KafkaProducer for the > api. > > > > > > > > > > Thanks > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar > > > wrote: > > > > > > > > > > > On 05/01/2014 07:22 PM, Christian Csar wrote: > > > > > > > I'm looking at using the java producer api for 0.8.1 and I'm > > > slightly > > > > > > > confused by this passage from section 4.4 of > > > > > > > https://kafka.apache.org/documentation.html#theproducer > > > > > > > "Note that as of Kafka 0.8.1 the async producer does not have a > > > > > > > callback, which could be used to register handlers to catch > send > > > > > errors. > > > > > > > Adding such callback functionality is proposed for Kafka 0.9, > see > > > > > > > [Proposed Producer > > > > > > > API]( > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI > > > > > ) > > > > > > ." > > > > > > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 > appears > > to > > > > > have > > > > > > > public Future send(ProducerRecord record, > > Callback > > > > > > > callback) which looks like the mentioned callback. > > > > > > > > > > > > > > How do the callbacks with the async producer? Is it as > described > > in > > > > the > > > > > > > comment on the send method (see > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > > > > > > for reference)? > > > > > > > > > > > > > > Looking around it seems plausible the language in the > > documentation > > > > > > > might refer to a separate sort of callback that existed in 0.7 > > but > > > > not > > > > > > > 0.8. In our use case we have something useful to do if we can > > > detect > > > > > > > messages failing to be sent. > > > > > > > > > > > > > > Christian > > > > > > > > > > > > > > > > > > > It appears that I was looking at the Java client rather than the > > > Scala > > > > > > java api referenced by the documentation > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > > > > > > > > > > > Are both of these currently suited for use from java and still > > > > > > supported? Given the support for callbacks in the event of > failure > > I > > > am > > > > > > inclined to use the Java one despite the currently limited > support > > > for > > > > > > specifying partitioners (though it supports specifying the > > partition) > > > > or > > > > > > encoders. > > > > > > > > > > > > Any guidance on this would be appreciated. > > > > > > > > > > > > Christian > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: 0.8.1 Java Producer API Callbacks
Hi Christian, As Jun mentioned, the new producer is marked beta since it is new. By 0.9 or even sooner, we'd expect it to be deployed at LinkedIn and a few other companies in a stable state. The APIs, guarantees and features will not change by 0.9, just the stability. Thanks, Neha On Fri, May 2, 2014 at 7:50 AM, Jun Rao wrote: > It's beta mostly because it's new. > > Thanks, > > Jun > > > On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com > wrote: > > > Thanks, that's quite helpful. According to this post, > > http://blog.empathybox.com/ , it looks like it will be beta then which > > seems good enough. Assuming that the beta designation is correct, is that > > because it won't have as many features/the same flexibility as it's > > expected to by 0.9? > > Christian > > > > > > On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede > >wrote: > > > > > The javadoc of the new producer is here for now - > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > Thanks, > > > Neha > > > > > > > > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao wrote: > > > > > > > The new producer (that supports callbacks) is in trunk. It will be > > > released > > > > in 0.8.2. You can look at the java doc of KafkaProducer for the api. > > > > > > > > Thanks > > > > > > > > Jun > > > > > > > > > > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar > > wrote: > > > > > > > > > On 05/01/2014 07:22 PM, Christian Csar wrote: > > > > > > I'm looking at using the java producer api for 0.8.1 and I'm > > slightly > > > > > > confused by this passage from section 4.4 of > > > > > > https://kafka.apache.org/documentation.html#theproducer > > > > > > "Note that as of Kafka 0.8.1 the async producer does not have a > > > > > > callback, which could be used to register handlers to catch send > > > > errors. > > > > > > Adding such callback functionality is proposed for Kafka 0.9, see > > > > > > [Proposed Producer > > > > > > API]( > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI > > > > ) > > > > > ." > > > > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears > to > > > > have > > > > > > public Future send(ProducerRecord record, > Callback > > > > > > callback) which looks like the mentioned callback. > > > > > > > > > > > > How do the callbacks with the async producer? Is it as described > in > > > the > > > > > > comment on the send method (see > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > > > > > for reference)? > > > > > > > > > > > > Looking around it seems plausible the language in the > documentation > > > > > > might refer to a separate sort of callback that existed in 0.7 > but > > > not > > > > > > 0.8. In our use case we have something useful to do if we can > > detect > > > > > > messages failing to be sent. > > > > > > > > > > > > Christian > > > > > > > > > > > > > > > > It appears that I was looking at the Java client rather than the > > Scala > > > > > java api referenced by the documentation > > > > > > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > > > > > > > > > Are both of these currently suited for use from java and still > > > > > supported? Given the support for callbacks in the event of failure > I > > am > > > > > inclined to use the Java one despite the currently limited support > > for > > > > > specifying partitioners (though it supports specifying the > partition) > > > or > > > > > encoders. > > > > > > > > > > Any guidance on this would be appreciated. > > > > > > > > > > Christian > > > > > > > > > > > > > > > > > > > >
Re: 0.8.1 Java Producer API Callbacks
It's beta mostly because it's new. Thanks, Jun On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com wrote: > Thanks, that's quite helpful. According to this post, > http://blog.empathybox.com/ , it looks like it will be beta then which > seems good enough. Assuming that the beta designation is correct, is that > because it won't have as many features/the same flexibility as it's > expected to by 0.9? > Christian > > > On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede >wrote: > > > The javadoc of the new producer is here for now - > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > Thanks, > > Neha > > > > > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao wrote: > > > > > The new producer (that supports callbacks) is in trunk. It will be > > released > > > in 0.8.2. You can look at the java doc of KafkaProducer for the api. > > > > > > Thanks > > > > > > Jun > > > > > > > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar > wrote: > > > > > > > On 05/01/2014 07:22 PM, Christian Csar wrote: > > > > > I'm looking at using the java producer api for 0.8.1 and I'm > slightly > > > > > confused by this passage from section 4.4 of > > > > > https://kafka.apache.org/documentation.html#theproducer > > > > > "Note that as of Kafka 0.8.1 the async producer does not have a > > > > > callback, which could be used to register handlers to catch send > > > errors. > > > > > Adding such callback functionality is proposed for Kafka 0.9, see > > > > > [Proposed Producer > > > > > API]( > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI > > > ) > > > > ." > > > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to > > > have > > > > > public Future send(ProducerRecord record, Callback > > > > > callback) which looks like the mentioned callback. > > > > > > > > > > How do the callbacks with the async producer? Is it as described in > > the > > > > > comment on the send method (see > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > > > > for reference)? > > > > > > > > > > Looking around it seems plausible the language in the documentation > > > > > might refer to a separate sort of callback that existed in 0.7 but > > not > > > > > 0.8. In our use case we have something useful to do if we can > detect > > > > > messages failing to be sent. > > > > > > > > > > Christian > > > > > > > > > > > > > It appears that I was looking at the Java client rather than the > Scala > > > > java api referenced by the documentation > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > > > > > > > Are both of these currently suited for use from java and still > > > > supported? Given the support for callbacks in the event of failure I > am > > > > inclined to use the Java one despite the currently limited support > for > > > > specifying partitioners (though it supports specifying the partition) > > or > > > > encoders. > > > > > > > > Any guidance on this would be appreciated. > > > > > > > > Christian > > > > > > > > > > > > > >
Re: 0.8.1 Java Producer API Callbacks
Thanks, that's quite helpful. According to this post, http://blog.empathybox.com/ , it looks like it will be beta then which seems good enough. Assuming that the beta designation is correct, is that because it won't have as many features/the same flexibility as it's expected to by 0.9? Christian On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede wrote: > The javadoc of the new producer is here for now - > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > Thanks, > Neha > > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao wrote: > > > The new producer (that supports callbacks) is in trunk. It will be > released > > in 0.8.2. You can look at the java doc of KafkaProducer for the api. > > > > Thanks > > > > Jun > > > > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar wrote: > > > > > On 05/01/2014 07:22 PM, Christian Csar wrote: > > > > I'm looking at using the java producer api for 0.8.1 and I'm slightly > > > > confused by this passage from section 4.4 of > > > > https://kafka.apache.org/documentation.html#theproducer > > > > "Note that as of Kafka 0.8.1 the async producer does not have a > > > > callback, which could be used to register handlers to catch send > > errors. > > > > Adding such callback functionality is proposed for Kafka 0.9, see > > > > [Proposed Producer > > > > API]( > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI > > ) > > > ." > > > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to > > have > > > > public Future send(ProducerRecord record, Callback > > > > callback) which looks like the mentioned callback. > > > > > > > > How do the callbacks with the async producer? Is it as described in > the > > > > comment on the send method (see > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > > > for reference)? > > > > > > > > Looking around it seems plausible the language in the documentation > > > > might refer to a separate sort of callback that existed in 0.7 but > not > > > > 0.8. In our use case we have something useful to do if we can detect > > > > messages failing to be sent. > > > > > > > > Christian > > > > > > > > > > It appears that I was looking at the Java client rather than the Scala > > > java api referenced by the documentation > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > > > > > Are both of these currently suited for use from java and still > > > supported? Given the support for callbacks in the event of failure I am > > > inclined to use the Java one despite the currently limited support for > > > specifying partitioners (though it supports specifying the partition) > or > > > encoders. > > > > > > Any guidance on this would be appreciated. > > > > > > Christian > > > > > > > > >
Re: 0.8.1 Java Producer API Callbacks
The javadoc of the new producer is here for now - http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html Thanks, Neha On Thu, May 1, 2014 at 9:16 PM, Jun Rao wrote: > The new producer (that supports callbacks) is in trunk. It will be released > in 0.8.2. You can look at the java doc of KafkaProducer for the api. > > Thanks > > Jun > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar wrote: > > > On 05/01/2014 07:22 PM, Christian Csar wrote: > > > I'm looking at using the java producer api for 0.8.1 and I'm slightly > > > confused by this passage from section 4.4 of > > > https://kafka.apache.org/documentation.html#theproducer > > > "Note that as of Kafka 0.8.1 the async producer does not have a > > > callback, which could be used to register handlers to catch send > errors. > > > Adding such callback functionality is proposed for Kafka 0.9, see > > > [Proposed Producer > > > API]( > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI > ) > > ." > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to > have > > > public Future send(ProducerRecord record, Callback > > > callback) which looks like the mentioned callback. > > > > > > How do the callbacks with the async producer? Is it as described in the > > > comment on the send method (see > > > > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > > for reference)? > > > > > > Looking around it seems plausible the language in the documentation > > > might refer to a separate sort of callback that existed in 0.7 but not > > > 0.8. In our use case we have something useful to do if we can detect > > > messages failing to be sent. > > > > > > Christian > > > > > > > It appears that I was looking at the Java client rather than the Scala > > java api referenced by the documentation > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > > > Are both of these currently suited for use from java and still > > supported? Given the support for callbacks in the event of failure I am > > inclined to use the Java one despite the currently limited support for > > specifying partitioners (though it supports specifying the partition) or > > encoders. > > > > Any guidance on this would be appreciated. > > > > Christian > > > > >
Re: 0.8.1 Java Producer API Callbacks
The new producer (that supports callbacks) is in trunk. It will be released in 0.8.2. You can look at the java doc of KafkaProducer for the api. Thanks Jun On Thu, May 1, 2014 at 8:43 PM, Christian Csar wrote: > On 05/01/2014 07:22 PM, Christian Csar wrote: > > I'm looking at using the java producer api for 0.8.1 and I'm slightly > > confused by this passage from section 4.4 of > > https://kafka.apache.org/documentation.html#theproducer > > "Note that as of Kafka 0.8.1 the async producer does not have a > > callback, which could be used to register handlers to catch send errors. > > Adding such callback functionality is proposed for Kafka 0.9, see > > [Proposed Producer > > API]( > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI) > ." > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have > > public Future send(ProducerRecord record, Callback > > callback) which looks like the mentioned callback. > > > > How do the callbacks with the async producer? Is it as described in the > > comment on the send method (see > > > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > > for reference)? > > > > Looking around it seems plausible the language in the documentation > > might refer to a separate sort of callback that existed in 0.7 but not > > 0.8. In our use case we have something useful to do if we can detect > > messages failing to be sent. > > > > Christian > > > > It appears that I was looking at the Java client rather than the Scala > java api referenced by the documentation > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala > > Are both of these currently suited for use from java and still > supported? Given the support for callbacks in the event of failure I am > inclined to use the Java one despite the currently limited support for > specifying partitioners (though it supports specifying the partition) or > encoders. > > Any guidance on this would be appreciated. > > Christian > >
Re: 0.8.1 Java Producer API Callbacks
On 05/01/2014 07:22 PM, Christian Csar wrote: > I'm looking at using the java producer api for 0.8.1 and I'm slightly > confused by this passage from section 4.4 of > https://kafka.apache.org/documentation.html#theproducer > "Note that as of Kafka 0.8.1 the async producer does not have a > callback, which could be used to register handlers to catch send errors. > Adding such callback functionality is proposed for Kafka 0.9, see > [Proposed Producer > API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)." > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have > public Future send(ProducerRecord record, Callback > callback) which looks like the mentioned callback. > > How do the callbacks with the async producer? Is it as described in the > comment on the send method (see > https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 > for reference)? > > Looking around it seems plausible the language in the documentation > might refer to a separate sort of callback that existed in 0.7 but not > 0.8. In our use case we have something useful to do if we can detect > messages failing to be sent. > > Christian > It appears that I was looking at the Java client rather than the Scala java api referenced by the documentation https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala Are both of these currently suited for use from java and still supported? Given the support for callbacks in the event of failure I am inclined to use the Java one despite the currently limited support for specifying partitioners (though it supports specifying the partition) or encoders. Any guidance on this would be appreciated. Christian signature.asc Description: OpenPGP digital signature
0.8.1 Java Producer API Callbacks
I'm looking at using the java producer api for 0.8.1 and I'm slightly confused by this passage from section 4.4 of https://kafka.apache.org/documentation.html#theproducer "Note that as of Kafka 0.8.1 the async producer does not have a callback, which could be used to register handlers to catch send errors. Adding such callback functionality is proposed for Kafka 0.9, see [Proposed Producer API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)." org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have public Future send(ProducerRecord record, Callback callback) which looks like the mentioned callback. How do the callbacks with the async producer? Is it as described in the comment on the send method (see https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151 for reference)? Looking around it seems plausible the language in the documentation might refer to a separate sort of callback that existed in 0.7 but not 0.8. In our use case we have something useful to do if we can detect messages failing to be sent. Christian signature.asc Description: OpenPGP digital signature
Re: Pattern for using kafka producer API
If you are only worried about throughput, you can use one producer in async mode. You can tune the batch size and time for better performance. Thanks, Jun On Sun, Feb 9, 2014 at 11:42 PM, pushkar priyadarshi < priyadarshi.push...@gmail.com> wrote: > What is the most appropriate design for using kafka producer from > performance view point.I had few in my mind. > > 1.Since single kafka producer object have synchronization; using single > producer object from multiple thread might not be efficient.so one way > would be to use multiple kafka producer from inside same thread. > > 2.Have multiple thread each having it's own instance of producer.This has > thread overheads if kafka internally using the same semantics. > > It would be great if someone can comment on these approaches or suggest > widely used one. > P.S. im using 0.8.0 and mostly concerned with async producer. > > Thanks And Regards, > Pushkar >
Pattern for using kafka producer API
What is the most appropriate design for using kafka producer from performance view point.I had few in my mind. 1.Since single kafka producer object have synchronization; using single producer object from multiple thread might not be efficient.so one way would be to use multiple kafka producer from inside same thread. 2.Have multiple thread each having it's own instance of producer.This has thread overheads if kafka internally using the same semantics. It would be great if someone can comment on these approaches or suggest widely used one. P.S. im using 0.8.0 and mostly concerned with async producer. Thanks And Regards, Pushkar
Re: Questions about producer API
Thank you, Neha. I mainly wanted to understand if it was because of historic reasons or some fundamental reason. I think managing configuration will be simpler if both sides use ZooKeeper for discovery. Great to hear about the client rewrite project going on. Thanks to you and the other contributors/commiters for this great software. Cheers, Roger On Wed, Oct 30, 2013 at 6:00 PM, Neha Narkhede wrote: > Agree that it is somewhat awkward to use zookeeper for broker discovery on > consumer, but a broker list on the producer. There were a couple of > discussions on the mailing list suggesting using zookeeper on the producer, > at least for discovering the brokers for the first time. However, we are > starting on the Client Rewrite project which is targeted for 0.9. That is > something we can consider changing on Kafka 0.9. If there is sufficient > interest, we can look making the zookeeper config change on the producer > soon. But that is something to discuss on a JIRA. > > Thanks, > Neha > > > On Wed, Oct 30, 2013 at 9:53 AM, Roger Hoover >wrote: > > > Hi, > > > > I'm still getting started with Kafka and was curious why there is an > > asymmetry between the producer and consumer APIs. Why does the producer > > config take a list of brokers where as the consumer config takes a list > of > > brokers? > > > > Thanks, > > > > Roger > > >
Re: Questions about producer API
Agree that it is somewhat awkward to use zookeeper for broker discovery on consumer, but a broker list on the producer. There were a couple of discussions on the mailing list suggesting using zookeeper on the producer, at least for discovering the brokers for the first time. However, we are starting on the Client Rewrite project which is targeted for 0.9. That is something we can consider changing on Kafka 0.9. If there is sufficient interest, we can look making the zookeeper config change on the producer soon. But that is something to discuss on a JIRA. Thanks, Neha On Wed, Oct 30, 2013 at 9:53 AM, Roger Hoover wrote: > Hi, > > I'm still getting started with Kafka and was curious why there is an > asymmetry between the producer and consumer APIs. Why does the producer > config take a list of brokers where as the consumer config takes a list of > brokers? > > Thanks, > > Roger >
Questions about producer API
Hi, I'm still getting started with Kafka and was curious why there is an asymmetry between the producer and consumer APIs. Why does the producer config take a list of brokers where as the consumer config takes a list of brokers? Thanks, Roger
RE: producer API thread safety
Great. Thanks. Regards, Libo -Original Message- From: Guozhang Wang [mailto:wangg...@gmail.com] Sent: Friday, October 04, 2013 12:27 PM To: users@kafka.apache.org Subject: Re: producer API thread safety The send() is thread safe, so the short answer would be yes. On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo wrote: > Hi team, > > Is it possible to use a single producer with more than one threads? I > am not sure If its send() is thread safe. > > Regards, > > Libo > > -- -- Guozhang
Re: producer API thread safety
The send() is thread safe, so the short answer would be yes. On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo wrote: > Hi team, > > Is it possible to use a single producer with more than one threads? I am > not sure > If its send() is thread safe. > > Regards, > > Libo > > -- -- Guozhang
producer API thread safety
Hi team, Is it possible to use a single producer with more than one threads? I am not sure If its send() is thread safe. Regards, Libo