sending mailchimp data to kafka cluster using producer api

2017-02-11 Thread VIVEK KUMAR MISHRA 13BIT0066
Hello sir,

I want to send mailchimp data to kafka broker(Topic) using producer api.
counld you please help me?


Re: producer api

2015-09-14 Thread Yuheng Du
Thank you Erik. But in my setup, there is only one node whose public ip
provided in my broker cluster, so I can only use one bootstrap broker as
for now.


On Mon, Sep 14, 2015 at 3:50 PM, Helleren, Erik 
wrote:

> You only need one of the brokers to connect for publishing.  Kafka will
> tell the client about all the other brokers.  But best practices state
> including all of them is best.
> -Erik
>
> On 9/14/15, 2:46 PM, "Yuheng Du"  wrote:
>
> >I am writing a kafka producer application in java. I want the producer to
> >publish data to a cluster of 6 brokers. Is there a way to specify only the
> >load balancing node but not all the brokers list?
> >
> >For example, like in the benchmarking kafka commandssdg:
> >
> >bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> >test 5000 100 -1 acks=-1 bootstrap.servers=
> >esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
> batch.size=64000
> >
> >it only specifies the bootstrap server node but not all the broker list
> >like:
> >
> >Properties props = new Properties();
> >
> >props.put("metadata.broker.list", "broker1:9092,broker2:9092");
> >
> >
> >
> >Thanks for replying.
>
>


Re: producer api

2015-09-14 Thread Helleren, Erik
You only need one of the brokers to connect for publishing.  Kafka will
tell the client about all the other brokers.  But best practices state
including all of them is best.
-Erik

On 9/14/15, 2:46 PM, "Yuheng Du"  wrote:

>I am writing a kafka producer application in java. I want the producer to
>publish data to a cluster of 6 brokers. Is there a way to specify only the
>load balancing node but not all the brokers list?
>
>For example, like in the benchmarking kafka commandssdg:
>
>bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
>test 5000 100 -1 acks=-1 bootstrap.servers=
>esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000
>
>it only specifies the bootstrap server node but not all the broker list
>like:
>
>Properties props = new Properties();
>
>props.put("metadata.broker.list", "broker1:9092,broker2:9092");
>
>
>
>Thanks for replying.



producer api

2015-09-14 Thread Yuheng Du
I am writing a kafka producer application in java. I want the producer to
publish data to a cluster of 6 brokers. Is there a way to specify only the
load balancing node but not all the brokers list?

For example, like in the benchmarking kafka commandssdg:

bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
test 5000 100 -1 acks=-1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=64000

it only specifies the bootstrap server node but not all the broker list
like:

Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092");



Thanks for replying.


Re: New Producer API - batched sync mode support

2015-06-17 Thread Greg Lloyd
@Shapira You are correct from my perspective. We are using kafka for a
system where panels can send multiple events in a single message. The
current contract is such that all events fail or succeed as a whole. If
there is a failure the panel resends all the events. The existing producer
api supports this fine, am I getting left behind here for the sake of
brevity?

I can get behind not adding every feature people ask for but taking away
something is a different story all together.

On Wed, Apr 29, 2015 at 9:08 PM, Gwen Shapira  wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>
> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps  wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov 
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>


Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server

2015-05-27 Thread Zhuo Liu
Hi Jiangjie, 

Thanks for the reply. 
A little concern is on that: kafka-0.8.1 builds with scala-2.9.1
while kafka-0.8.2 builds with scala-2.10.
Anyway, I will test it soon to confirm. Thanks.

Best Regards,
Zhuo Liu
Ph.D. Student, CSSE department
Auburn University, AL 36849
http://www.auburn.edu/~zzl0014/


From: Jiangjie Qin 
Sent: Wednesday, May 27, 2015 7:18 PM
To: users@kafka.apache.org
Subject: Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 
Kafka server

It should work, but usually we prefer the server version to be not lower
than client version.

On 5/27/15, 3:12 PM, "Zhuo Liu"  wrote:

>Dear all,
>
>
>In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.).
>
>My question is: will 0.8.2.1 new Producer API
>
>be able to successfully talk to a Kafka server cluster running
>
>with 0.8.1.1 Kafka?
>
>
>Thanks very much!
>
>
>Best Regards,
>Zhuo Liu
>Ph.D. Student, CSSE department
>Auburn University, AL 36849
>http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>



Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server

2015-05-27 Thread Jiangjie Qin
It should work, but usually we prefer the server version to be not lower
than client version.

On 5/27/15, 3:12 PM, "Zhuo Liu"  wrote:

>Dear all,
>
>
>In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.).
>
>My question is: will 0.8.2.1 new Producer API
>
>be able to successfully talk to a Kafka server cluster running
>
>with 0.8.1.1 Kafka?
>
>
>Thanks very much!
>
>
>Best Regards,
>Zhuo Liu
>Ph.D. Student, CSSE department
>Auburn University, AL 36849
>http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>



Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server

2015-05-27 Thread Zhuo Liu
Dear all,


In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.).

My question is: will 0.8.2.1 new Producer API

be able to successfully talk to a Kafka server cluster running

with 0.8.1.1 Kafka?


Thanks very much!


Best Regards,
Zhuo Liu
Ph.D. Student, CSSE department
Auburn University, AL 36849
http://www.auburn.edu/~zzl0014/<http://www.auburn.edu/%7Ezzl0014/>


Re: New Producer API Design

2015-05-14 Thread Mohit Gupta
Thanks guys, I got the point.
We ended up handling the ser/de in custom wrappers over the generic
byte[],byte[] producer to ensure only single producer for the application
and neither lose the type safety.





On Wed, May 13, 2015 at 11:31 PM, Guozhang Wang  wrote:

> Hello Mohit,
>
> When we originally design the new producer API we removed the serializer /
> deserializer from the old producer and made it generic as accepting only
> message, but we later concluded it would still be more
> beneficial to add the serde back into the producer API. And as you observed
> one consequence is that if you have multiple data serialization formats
> then you have to use one producer for each.
>
> The recommended way of using Kafka producer in production is to stick with
> one serialization format, e.g. Avro, with different topics to have
> different schemas that could still be ser-/de- by the single producer.
>
> Guozhang
>
> On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta <
> success.mohit.gu...@gmail.com
> > wrote:
>
> > Hello,
> >
> > I've a question regarding the design of the new Producer API.
> >
> > As per the design (KafkaProducer), it seems that a separate producer
> > is required for every combination of key and value type. Where as, in
> > documentation ( and elsewhere ) it's recommended to create a single
> > producer instance per application and share it among the all the threads
> > for best performance?
> >
> > One way to create only single producer would be to use byte[] as
> key/value
> > type and handle the serialization at the client itself, rather than the
> > producer, similar to the example in javadocs. But wouldn't this defeat
> the
> > purpose of using generics in the producer?
> >
> > Specific to our use case, we have multiple types of messages, where each
> > message type can have multiple custom serializers. And, a message can be
> > pushed into mulitple topics with different serialization.
> >
> >
> > --
> > Best Regards,
> >
> > Mohit Gupta
> >
>
>
>
> --
> -- Guozhang
>



-- 
Best Regards,

Mohit Gupta


Re: New Producer API Design

2015-05-13 Thread Guozhang Wang
Hello Mohit,

When we originally design the new producer API we removed the serializer /
deserializer from the old producer and made it generic as accepting only
message, but we later concluded it would still be more
beneficial to add the serde back into the producer API. And as you observed
one consequence is that if you have multiple data serialization formats
then you have to use one producer for each.

The recommended way of using Kafka producer in production is to stick with
one serialization format, e.g. Avro, with different topics to have
different schemas that could still be ser-/de- by the single producer.

Guozhang

On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta  wrote:

> Hello,
>
> I've a question regarding the design of the new Producer API.
>
> As per the design (KafkaProducer), it seems that a separate producer
> is required for every combination of key and value type. Where as, in
> documentation ( and elsewhere ) it's recommended to create a single
> producer instance per application and share it among the all the threads
> for best performance?
>
> One way to create only single producer would be to use byte[] as key/value
> type and handle the serialization at the client itself, rather than the
> producer, similar to the example in javadocs. But wouldn't this defeat the
> purpose of using generics in the producer?
>
> Specific to our use case, we have multiple types of messages, where each
> message type can have multiple custom serializers. And, a message can be
> pushed into mulitple topics with different serialization.
>
>
> --
> Best Regards,
>
> Mohit Gupta
>



-- 
-- Guozhang


Re: New Producer API Design

2015-05-13 Thread Ewen Cheslack-Postava
You can of course use KafkaProducer to get a producer
interface that can accept a variety of types. For example, if you have an
Avro serializer that accepts both primitive types (e.g. String, integer
types) and complex types (e.g. records, arrays, maps), Object is the only
type you can use to cover all of those. As long as your serializer supports
it, you can use a general type and pass in a variety of types to a single
producer.

The drawback is that you don't get feedback at compile time if you pass in
a type that you weren't expecting. For example, if you know your keys are
always going to be Strings, it's probably a good idea to use a
KafkaProducer so that you catch a case where you
accidentally pass in a different object. There are a lot of use cases where
an application is only producing a single format of data, so supporting the
type checking can be valuable.

The type checking isn't going to be perfect because of type erasure and
since serializers are often instantiated via reflection. However, having
the type information can offer some compile-time protection to application
code using the clients.

-Ewen

On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta  wrote:

> Hello,
>
> I've a question regarding the design of the new Producer API.
>
> As per the design (KafkaProducer), it seems that a separate producer
> is required for every combination of key and value type. Where as, in
> documentation ( and elsewhere ) it's recommended to create a single
> producer instance per application and share it among the all the threads
> for best performance?
>
> One way to create only single producer would be to use byte[] as key/value
> type and handle the serialization at the client itself, rather than the
> producer, similar to the example in javadocs. But wouldn't this defeat the
> purpose of using generics in the producer?
>
> Specific to our use case, we have multiple types of messages, where each
> message type can have multiple custom serializers. And, a message can be
> pushed into mulitple topics with different serialization.
>
>
> --
> Best Regards,
>
> Mohit Gupta
>



-- 
Thanks,
Ewen


New Producer API Design

2015-05-13 Thread Mohit Gupta
Hello,

I've a question regarding the design of the new Producer API.

As per the design (KafkaProducer), it seems that a separate producer
is required for every combination of key and value type. Where as, in
documentation ( and elsewhere ) it's recommended to create a single
producer instance per application and share it among the all the threads
for best performance?

One way to create only single producer would be to use byte[] as key/value
type and handle the serialization at the client itself, rather than the
producer, similar to the example in javadocs. But wouldn't this defeat the
purpose of using generics in the producer?

Specific to our use case, we have multiple types of messages, where each
message type can have multiple custom serializers. And, a message can be
pushed into mulitple topics with different serialization.


-- 
Best Regards,

Mohit Gupta


Re: Doubt regarding new Producer and old Producer API

2015-05-13 Thread Guozhang Wang
Hi Madhukar,

1. the java producer API can also be used for sync call; you can do it with
producer.send().get().
2. the partitioner class has been removed from the new producer API,
instead now the message could have a specific partition id. You could
calculate the partition id in your customized partitioner beforehand and
put the partition id into the message.

Guozhang

On Wed, May 13, 2015 at 4:05 AM, Madhukar Bharti 
wrote:

> Hi all,
>
> What are the possible use cases for using new producer API?
> - Is this only provides async call with callback feature?
> - Is partitioner class has been removed from new Producer API? if not then
> how to implement it if I want to use only client APIs?
>
>
>
> Regards,
> Madhukar
>



-- 
-- Guozhang


Doubt regarding new Producer and old Producer API

2015-05-13 Thread Madhukar Bharti
Hi all,

What are the possible use cases for using new producer API?
- Is this only provides async call with callback feature?
- Is partitioner class has been removed from new Producer API? if not then
how to implement it if I want to use only client APIs?



Regards,
Madhukar


Re: Differences between new and legacy scala producer API

2015-05-08 Thread Manoj Khangaonkar
On Thu, May 7, 2015 at 10:01 PM, Rendy Bambang Junior <
rendy.b.jun...@gmail.com> wrote:

> Hi
>
> - Legacy scala api for producer is having keyed message along with topic,
> key, partkey, and message. Meanwhile new api has no partkey. Whats the
> difference between key and partkey?
>

In the new API, key is encapsulated in the ProducerRecord.

regards



-- 
http://khangaonkar.blogspot.com/


Differences between new and legacy scala producer API

2015-05-07 Thread Rendy Bambang Junior
Hi

- Legacy scala api for producer is having keyed message along with topic,
key, partkey, and message. Meanwhile new api has no partkey. Whats the
difference between key and partkey?
- In javadoc, new producer api send method is always async, does
producer.type properties overriden?
- Will scala legacy api be deprecated any time soon?

Rendy


Re: New Producer API - batched sync mode support

2015-05-02 Thread Jay Kreps
 message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov 
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>


Re: New Producer API - batched sync mode support

2015-04-30 Thread Jiangjie Qin
Roshan,

If I understand correctly, you just want to make sure a number of messages
has been sent successfully. Using callback might be easier to do so.

Public class MyCallback implements Callback {
public Set failedSend;
@Override
Public void onCompletion(RecordMetadata metadata, Exception exception) {
If (exception != null)
failedSend.add(metadata);
}

Public boolean hasFailure() {return failedSend.size() > 0);
}

In main code, you just need to do the following:
{
MyCallback callback = new MyCallback();
For (ProducerRecord record: records)
Producer.send();

Producer.flush();
If (callback.hasFailure())
// do something
}

This will avoid the loop checking and provide you pretty much the same
guarantee as old producer if not better.

Jiangjie (Becket) Qin


On 4/30/15, 4:54 PM, "Roshan Naik"  wrote:

>@Gwen, @Ewen,
>  While atomicity of a batch is nice to have, it is not essential. I don't
>think users always expect such atomicity. Atomicity is not even guaranteed
>in many un-batched systems let alone batched systems.
>
>As long as the client gets informed about the ones that failed in the
>batch.. that would suffice.
>
>One issue with the current flush() based batch-sync implementation is that
>the client needs to iterate over *all* futures in order to scan for any
>failed messages. In the common case, it is just wasted CPU cycles as there
>won't be any failures. Would be ideal if the client is informed about only
>problematic messages.
>
>  IMO, adding a new send(batch) API may be meaningful if it can provide
>benefits beyond what user can do with a simple wrapper on existing stuff.
>For example: eliminate the CPU cycles wasted on examining results from
>successful message deliveries, or other efficiencies.
>
>
>
>@Ivan,
>   I am not certain, I am thinking that there is a possibility that the
>first few messages of the batch got accepted, but not the remainder ? At
>the same time based on some comments made earlier it appears underlying
>implementation does have an all-or-none mechanism for a batch going to a
>partition.
>For simplicity, streaming clients may not want to deal explicitly with
>partitions (and get exposed to repartitioning & leader change type issues)
>
>-roshan
>
>
>
>On 4/30/15 2:07 PM, "Gwen Shapira"  wrote:
>
>>Why do we think atomicity is expected, if the old API we are emulating
>>here
>>lacks atomicity?
>>
>>I don't remember emails to the mailing list saying: "I expected this
>>batch
>>to be atomic, but instead I got duplicates when retrying after a failed
>>batch send".
>>Maybe atomicity isn't as strong requirement as we believe? That is,
>>everyone expects some duplicates during failure events and handles them
>>downstream?
>>
>>
>>
>>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov 
>>wrote:
>>
>>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava :
>>>
>>> > They aren't going to get this anyway (as Jay pointed out) given the
>>> current
>>> > broker implementation
>>> >
>>>
>>> Is it also incorrect to assume atomicity even if all messages in the
>>>batch
>>> go to the same partition?
>>>
>



Re: New Producer API - batched sync mode support

2015-04-30 Thread Roshan Naik
@Gwen, @Ewen,
  While atomicity of a batch is nice to have, it is not essential. I don't
think users always expect such atomicity. Atomicity is not even guaranteed
in many un-batched systems let alone batched systems.

As long as the client gets informed about the ones that failed in the
batch.. that would suffice.

One issue with the current flush() based batch-sync implementation is that
the client needs to iterate over *all* futures in order to scan for any
failed messages. In the common case, it is just wasted CPU cycles as there
won't be any failures. Would be ideal if the client is informed about only
problematic messages.

  IMO, adding a new send(batch) API may be meaningful if it can provide
benefits beyond what user can do with a simple wrapper on existing stuff.
For example: eliminate the CPU cycles wasted on examining results from
successful message deliveries, or other efficiencies.



@Ivan,
   I am not certain, I am thinking that there is a possibility that the
first few messages of the batch got accepted, but not the remainder ? At
the same time based on some comments made earlier it appears underlying
implementation does have an all-or-none mechanism for a batch going to a
partition.
For simplicity, streaming clients may not want to deal explicitly with
partitions (and get exposed to repartitioning & leader change type issues)

-roshan



On 4/30/15 2:07 PM, "Gwen Shapira"  wrote:

>Why do we think atomicity is expected, if the old API we are emulating
>here
>lacks atomicity?
>
>I don't remember emails to the mailing list saying: "I expected this batch
>to be atomic, but instead I got duplicates when retrying after a failed
>batch send".
>Maybe atomicity isn't as strong requirement as we believe? That is,
>everyone expects some duplicates during failure events and handles them
>downstream?
>
>
>
>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov 
>wrote:
>
>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava :
>>
>> > They aren't going to get this anyway (as Jay pointed out) given the
>> current
>> > broker implementation
>> >
>>
>> Is it also incorrect to assume atomicity even if all messages in the
>>batch
>> go to the same partition?
>>



Re: New Producer API - batched sync mode support

2015-04-30 Thread Gwen Shapira
Why do we think atomicity is expected, if the old API we are emulating here
lacks atomicity?

I don't remember emails to the mailing list saying: "I expected this batch
to be atomic, but instead I got duplicates when retrying after a failed
batch send".
Maybe atomicity isn't as strong requirement as we believe? That is,
everyone expects some duplicates during failure events and handles them
downstream?



On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov  wrote:

> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava :
>
> > They aren't going to get this anyway (as Jay pointed out) given the
> current
> > broker implementation
> >
>
> Is it also incorrect to assume atomicity even if all messages in the batch
> go to the same partition?
>


Re: New Producer API - batched sync mode support

2015-04-30 Thread Ivan Balashov
2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava :

> They aren't going to get this anyway (as Jay pointed out) given the current
> broker implementation
>

Is it also incorrect to assume atomicity even if all messages in the batch
go to the same partition?


Re: New Producer API - batched sync mode support

2015-04-29 Thread Ewen Cheslack-Postava
gt; > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov 
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>



-- 
Thanks,
Ewen


Re: New Producer API - batched sync mode support

2015-04-29 Thread Gwen Shapira
I'm starting to think that the old adage "If two people say you are drunk,
lie down" applies here :)

Current API seems perfectly clear, useful and logical to everyone who wrote
it... but we are getting multiple users asking for the old batch behavior
back.
One reason to get it back is to make upgrades easier - people won't need to
rethink their existing logic if they get an API with the same behavior in
the new producer. The other reason is what Ewen mentioned earlier - if
everyone re-implements Joel's logic, we can provide something for that.

How about getting the old batch send behavior back by adding a new API with:
public void batchSend(List>)

With this implementation (mixes the old behavior with Joel's snippet):
* send records one by one
* flush
* iterate on futures and "get" them
* log a detailed message on each error
* throw an exception if any send failed.

It reproduces the old behavior - which apparently everyone really liked,
and I don't think it is overly weird. It is very limited, but anyone who
needs more control over his sends already have plenty of options.

Thoughts?

Gwen




On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps  wrote:

> Hey guys,
>
> The locking argument is correct for very small records (< 50 bytes),
> batching will help here because for small records locking becomes the big
> bottleneck. I think these use cases are rare but not unreasonable.
>
> Overall I'd emphasize that the new producer is way faster at virtually all
> use cases. If there is a use case where that isn't true, let's look at it
> in a data driven way by comparing the old producer to the new producer and
> looking for any areas where things got worse.
>
> I suspect the "reducing allocations" argument to be not a big thing. We do
> a number of small per-message allocations and it didn't seem to have much
> impact. I do think there are a couple of big producer memory optimizations
> we could do by reusing the arrays in the accumulator in the serialization
> of the request but I don't think this is one of them.
>
> I'd be skeptical of any api that was too weird--i.e. introduces a new way
> of partitioning, gives back errors on a per-partition rather than per
> message basis (given that partitioning is transparent this is really hard
> to think about), etc. Bad apis end up causing a ton of churn and just don't
> end up being a good long term commitment as we change how the underlying
> code works over time (i.e. we hyper optimize for something then have to
> maintain some super weird api as it becomes hyper unoptimized for the
> client over time).
>
> Roshan--Flush works as you would hope, it blocks on the completion of all
> outstanding requests. Calling get on the future for the request gives you
> the associated error code back. Flush doesn't throw any exceptions because
> waiting for requests to complete doesn't error, the individual requests
> fail or succeed which is always reported with each request.
>
> Ivan--The batches you send in the scala producer today actually aren't
> truely atomic, they just get sent in a single request.
>
> One tricky problem to solve when user's do batching is size limits on
> requests. This can be very hard to manage since predicting the serialized
> size of a bunch of java objects is not always obvious. This was repeatedly
> a problem before.
>
> -Jay
>
> On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov 
> wrote:
>
> > I must agree with @Roshan – it's hard to imagine anything more intuitive
> > and easy to use for atomic batching as old sync batch api. Also, it's
> fast.
> > Coupled with a separate instance of producer per
> > broker:port:topic:partition it works very well. I would be glad if it
> finds
> > its way into new producer api.
> >
> > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > fetchSize must be set at least as batch bytes (before or after
> > compression), otherwise client risks not getting any messages?
> >
>


Re: New Producer API - batched sync mode support

2015-04-28 Thread Jay Kreps
Hey guys,

The locking argument is correct for very small records (< 50 bytes),
batching will help here because for small records locking becomes the big
bottleneck. I think these use cases are rare but not unreasonable.

Overall I'd emphasize that the new producer is way faster at virtually all
use cases. If there is a use case where that isn't true, let's look at it
in a data driven way by comparing the old producer to the new producer and
looking for any areas where things got worse.

I suspect the "reducing allocations" argument to be not a big thing. We do
a number of small per-message allocations and it didn't seem to have much
impact. I do think there are a couple of big producer memory optimizations
we could do by reusing the arrays in the accumulator in the serialization
of the request but I don't think this is one of them.

I'd be skeptical of any api that was too weird--i.e. introduces a new way
of partitioning, gives back errors on a per-partition rather than per
message basis (given that partitioning is transparent this is really hard
to think about), etc. Bad apis end up causing a ton of churn and just don't
end up being a good long term commitment as we change how the underlying
code works over time (i.e. we hyper optimize for something then have to
maintain some super weird api as it becomes hyper unoptimized for the
client over time).

Roshan--Flush works as you would hope, it blocks on the completion of all
outstanding requests. Calling get on the future for the request gives you
the associated error code back. Flush doesn't throw any exceptions because
waiting for requests to complete doesn't error, the individual requests
fail or succeed which is always reported with each request.

Ivan--The batches you send in the scala producer today actually aren't
truely atomic, they just get sent in a single request.

One tricky problem to solve when user's do batching is size limits on
requests. This can be very hard to manage since predicting the serialized
size of a bunch of java objects is not always obvious. This was repeatedly
a problem before.

-Jay

On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov  wrote:

> I must agree with @Roshan – it's hard to imagine anything more intuitive
> and easy to use for atomic batching as old sync batch api. Also, it's fast.
> Coupled with a separate instance of producer per
> broker:port:topic:partition it works very well. I would be glad if it finds
> its way into new producer api.
>
> On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> fetchSize must be set at least as batch bytes (before or after
> compression), otherwise client risks not getting any messages?
>


Re: New Producer API - batched sync mode support

2015-04-28 Thread Ivan Balashov
I must agree with @Roshan – it's hard to imagine anything more intuitive
and easy to use for atomic batching as old sync batch api. Also, it's fast.
Coupled with a separate instance of producer per
broker:port:topic:partition it works very well. I would be glad if it finds
its way into new producer api.

On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
fetchSize must be set at least as batch bytes (before or after
compression), otherwise client risks not getting any messages?


Re: New Producer API - batched sync mode support

2015-04-28 Thread Roshan Naik
@Ewen
 No I did not use compression in my measurements.



Re: New Producer API - batched sync mode support

2015-04-28 Thread Roshan Naik
@Joel,
If flush() works for this use case it may be an acceptable starting point
(although not as clean as a native batched sync). I am not as yet clear
about some aspects of flush's batch semantics and its suitability for this
mode of operation. Allow me explore it with you folks..

 1) flush() guarantees: What is the guarantees that one can expect when a
flush() call returns ?  Is it successful delivery of all events in the
buffer to broker as per the configured ack setting ?

 2) flush() error handling:  It does not throw any exception. What is the
mechanism for indicating failure in delivery of one or more events in the
batch ? Is future.get() the way to detect it ? If so, will future.get() be
applicable to all types of delivery failures (could be a network glitch or
something simpler like Kafka responding that it was not being able to
accept some events)


 2) Multithreaded Clients: The situation being that each client thread is
trying to push out a batch. flush() will pump out data from the not just
the calling thread but also from other threads. Does  Need to think
through this a bit more if that¹s ok for Multi Threaded clients.


 3) Extra synchronization and Object creation: Like Ewen pointed out, this
method definitely creates too many (Future) objects and also too much
locking/synchronization due to repeated calls to Producer.send() and
future.get(). However, if the measured perf impact of this not too much
then I guess its ok.


@Ewen,
  I am unable to find the email where I mentioned 10bytes event size. To
me 500byte to 1kB event size is more interesting. I had run measurements
with event sizes 500byte, 1k, 4k, 8k and 16k. In 0.8.1 producer_perf_test
tool, the 8k and 16k event sizes showed much better throughput in --sync
mode (throughput almost doubled with doubling of event size). The perf
tool was not using the Producer.send(list<> ) api though. I saw great
improvement when I changed it to use the Producer.send(list<> ). Sometimes
it easily exceeded the throughput async mode.



-roshan





On 4/27/15 10:32 PM, "Ewen Cheslack-Postava"  wrote:

>A couple of thoughts:
>
>1. @Joel I agree it's not hard to use the new API but it definitely is
>more
>verbose. If that snippet of code is being written across hundreds of
>projects, that probably means we're missing an important API. Right now
>I've only seen the one complaint, but it's worth finding out how many
>people feel like it's missing. And given that internally each of the
>returned Futures just uses the future for the entire batch, I think it's
>probably worth investigating if getting rid of millions of allocs per
>second is worth it, even if they should be in the nursery and fast to
>collect.
>
>2. For lots of small messages, there's definitely the potential for a
>performance benefit by avoiding a lot of lock acquire/release in send().
>If
>you make a first pass to organize by topic partition and then process each
>group, you lock # of partitions times rather than # of messages times. One
>major drawback I see is that it seems to make a mess of error
>handling/blocking when the RecordAccumulator runs out of space.
>
>3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
>realistic payload size for you? I can imagine applications where it is
>(and
>we should support those well), it just sounds unusually small.
>
>4. I reproduced Jay's benchmark blog post awhile ago in an automated test
>(see
>https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_
>benchmark_test.py).
>Here's a snippet from the output on m3.2xlarge instances that might help
>shed some light on the situation:
>INFO:_.KafkaBenchmark:Message size:
>INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
>INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
>INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
>INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
>INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
>
>That's using the single-threaded new ProducerPerformance class, so the
>m3.2xlarge's # of cores probably has little influence. There's clearly a
>sharp increase in throughput from 10 -> 100 byte messages. I recall double
>checking that the CPU was fully utilized. Note that this is with the
>acks=1
>setting that doesn't actually exist anymore, so take with a grain of salt.
>
>5. I'd suggest that there may be other APIs that give the implementation
>more flexibility but still provide batching. For example:
>* Require batched inputs to be prepartitioned so each call specifies the
>TopicPartition. Main benefit here is that the producer avoids having to do
>all the sorting, which the application may already be doing anyway.
>* How about an API similar to fwrite() where you provide a set of messages
>but it may only write some of them and tells you how many it wrote? This
>could be a clean way to expose the underlying batching that is performed
>without being a c

Re: New Producer API - batched sync mode support

2015-04-27 Thread Ewen Cheslack-Postava
A couple of thoughts:

1. @Joel I agree it's not hard to use the new API but it definitely is more
verbose. If that snippet of code is being written across hundreds of
projects, that probably means we're missing an important API. Right now
I've only seen the one complaint, but it's worth finding out how many
people feel like it's missing. And given that internally each of the
returned Futures just uses the future for the entire batch, I think it's
probably worth investigating if getting rid of millions of allocs per
second is worth it, even if they should be in the nursery and fast to
collect.

2. For lots of small messages, there's definitely the potential for a
performance benefit by avoiding a lot of lock acquire/release in send(). If
you make a first pass to organize by topic partition and then process each
group, you lock # of partitions times rather than # of messages times. One
major drawback I see is that it seems to make a mess of error
handling/blocking when the RecordAccumulator runs out of space.

3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
realistic payload size for you? I can imagine applications where it is (and
we should support those well), it just sounds unusually small.

4. I reproduced Jay's benchmark blog post awhile ago in an automated test
(see
https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_benchmark_test.py).
Here's a snippet from the output on m3.2xlarge instances that might help
shed some light on the situation:
INFO:_.KafkaBenchmark:Message size:
INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)

That's using the single-threaded new ProducerPerformance class, so the
m3.2xlarge's # of cores probably has little influence. There's clearly a
sharp increase in throughput from 10 -> 100 byte messages. I recall double
checking that the CPU was fully utilized. Note that this is with the acks=1
setting that doesn't actually exist anymore, so take with a grain of salt.

5. I'd suggest that there may be other APIs that give the implementation
more flexibility but still provide batching. For example:
* Require batched inputs to be prepartitioned so each call specifies the
TopicPartition. Main benefit here is that the producer avoids having to do
all the sorting, which the application may already be doing anyway.
* How about an API similar to fwrite() where you provide a set of messages
but it may only write some of them and tells you how many it wrote? This
could be a clean way to expose the underlying batching that is performed
without being a completely leaky abstraction. We could then return just a
single future for the entire batch, we'd do minimal locking, etc. Not sure
how to handle different TopicPartitions in the same set. I think this could
be a good pattern for people who want maximally efficient ordered writes
where errors are properly handled too.

6. If I recall correctly, doesn't compression occur in a synchronized
block, I think in the RecordAccumulator? Or maybe it was in the network
thread? In any case, I seem to recall compression also possibly playing an
important role in performance because it operates over a set of records
which limits where you can run it. @Roshan, are you using compression, both
in your microbenchmarks and your application?

I think there's almost definitely a good case to be made for a batch API,
but probably needs some very clear motivating use cases and perf
measurements showing why it's not going to be feasible to accomplish with
the current API + a few helpers to wrap it in a batch API.

-Ewen


On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy  wrote:

>
> >   Fine grained tracking of status of individual events is quite painful
> in
> > contrast to simply blocking on every batch. Old style Batched-sync mode
> > has great advantages in terms of simplicity and performance.
>
> I may be missing something, but I'm not so convinced that it is that
> painful/very different from the old-style.
>
> In the old approach, you would compose a batch (in a list of messages)
> and do a synchronous send:
>
> try {
>   producer.send(recordsToSend)
> }
> catch (...) {
>   // handle (e.g., retry sending recordsToSend)
> }
>
> In the new approach, you would do (something like) this:
>
> for (record: recordsToSend) {
>   futureList.add(producer.send(record));
> }
> producer.flush();
> for (result: futureList) {
>   try { result.get(); }
>   catch (...) { // handle (e.g., retry sending recordsToSend) }
> }
>
>
>


-- 
Thanks,
Ewen


Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy

>   Fine grained tracking of status of individual events is quite painful in
> contrast to simply blocking on every batch. Old style Batched-sync mode
> has great advantages in terms of simplicity and performance.

I may be missing something, but I'm not so convinced that it is that
painful/very different from the old-style.

In the old approach, you would compose a batch (in a list of messages)
and do a synchronous send:

try {
  producer.send(recordsToSend)
}
catch (...) {
  // handle (e.g., retry sending recordsToSend)
}

In the new approach, you would do (something like) this:

for (record: recordsToSend) {
  futureList.add(producer.send(record));
}
producer.flush();
for (result: futureList) {
  try { result.get(); }
  catch (...) { // handle (e.g., retry sending recordsToSend) }
}




Re: New Producer API - batched sync mode support

2015-04-27 Thread Roshan Naik


On 4/27/15 2:59 PM, "Gwen Shapira"  wrote:

>@Roshan - if the data was already written to Kafka, your approach will
>generate LOTS of duplicates. I'm not convinced its ideal.


Only if the delivery failure rate is very high (i.e. short lived but very
frequent).  This batch semantics is not uncommon
also since Kafka had sync-batching in the past, it would not be new to
Kafka either.

But, like I had mentioned, there is an alternative to mitigate this
duplication... Return value could indicate failed messages.



Wanted to add.. That these failures typically assume that we got an
"error" back from Kafka Broker. There are other modes of failure (like
network glitches) which will means you only get some (potentially not very
informative) exception.



>
>What's wrong with callbacks?


The complexities of fine grained tracking  .. that I described in prev
email.




Re: New Producer API - batched sync mode support

2015-04-27 Thread Gwen Shapira
@Roshan - if the data was already written to Kafka, your approach will
generate LOTS of duplicates. I'm not convinced its ideal.

What's wrong with callbacks?

On Mon, Apr 27, 2015 at 2:53 PM, Roshan Naik  wrote:

> @Gwen
>
>  - A failure in delivery of one or more events in the batch (typical Flume
> case) is considered a failure of the entire batch and the client
> redelivers the entire batch.
>  - If clients want more fine grained control, alternative option is to
> indicate which events failed in the return value of  producer.send(list<>)
>
>
> @Joel
>   Fine grained tracking of status of individual events is quite painful in
> contrast to simply blocking on every batch. Old style Batched-sync mode
> has great advantages in terms of simplicity and performance.
>   Imagine a simple use case of client simply reading a directory of log
> files and splitting them into log messages/events and pushing them through
> kafka. Becomes more complex when all this tracking data needs to be
> persisted to accommodate for client restarts/crashes. Tracking a simple
> current line number and file name is easy to programming with and persist
> to accommodateŠ as opposed start/end position of each log message in the
> file.
>
>
> -roshan
>
>
>
>
>
> On 4/27/15 2:07 PM, "Joel Koshy"  wrote:
>
> >As long as you retain the returned futures somewhere, you can always
> >iterate over the futures after the flush completes and check for
> >success/failure. Would that work for you?
> >
> >On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote:
> >> The important guarantee that is needed for a client producer thread is
> >> that it requires an indication of success/failure of the batch of events
> >> it pushed. Essentially it needs to retry producer.send() on that same
> >> batch in case of failure. My understanding is that flush will simply
> >>flush
> >> data from all threads (correct me if I am wrong).
> >>
> >> -roshan
> >>
> >>
> >>
> >> On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
> >>
> >> >This sounds like flush:
> >>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me
> >>>th
> >> >od+to+the+producer+API
> >> >
> >> >which was recently implemented in trunk.
> >> >
> >> >Joel
> >> >
> >> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> >> >> Been evaluating the perf of old and new Produce APIs for reliable
> >>high
> >> >>volume streaming data movement. I do see one area of improvement that
> >> >>the new API could use for synchronous clients.
> >> >>
> >> >> AFAIKT, the new API does not support batched synchronous transfers.
> >>To
> >> >>do synchronous send, one needs to do a future.get() after every
> >> >>Producer.send(). I changed the new
> >> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >> >>mode of operation. May not be surprising that it much slower than the
> >> >>async mode... hard t push it beyond 4MB/s.
> >> >>
> >> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >> >>Producer.send( List ) . My measurements show that it was
> >> >>able to approach (and sometimes exceed) the old async speeds...
> >>266MB/s
> >> >>
> >> >>
> >> >> Supporting this batched sync mode is very critical for streaming
> >> >>clients (such as flume for example) that need delivery guarantees.
> >> >>Although it can be done with Async mode, it requires additional book
> >> >>keeping as to which events are delivered and which ones are not. The
> >> >>programming model becomes much simpler with the batched sync mode.
> >> >>Client having to deal with one single future.get() helps performance
> >> >>greatly too as I noted.
> >> >>
> >> >> Wanted to propose adding this as an enhancement to the new Producer
> >>API.
> >> >
> >>
> >
>
>


Re: New Producer API - batched sync mode support

2015-04-27 Thread Gwen Shapira
I should have been clearer - I used Roshan's terminology in my reply.

Basically, the old producer "batch" Send() just took a sequence of
messages. I assumed Roshan is looking for something similar - which allows
for mixing messages for multiple partitions and therefore can fail for some
messages and succeed for others.

This is unrelated for MessageSet, which is for a specific partition and
indeed fails or succeeds as a whole.

For completeness - the internal RecordAccumulator component of the new
KafkaProducer does manage a separate batch for each partition, and these
batches should succeed or fail as a whole. I'm not sure I want to expose
this level of implementation detail in our API though.

Gwen


On Mon, Apr 27, 2015 at 2:36 PM, Magnus Edenhill  wrote:

> Hi Gwen,
>
> can you clarify: by batch do you mean the protocol MessageSet, or some java
> client internal construct?
> If the former I was under the impression that a produced MessageSet either
> succeeds delivery or errors in its entirety on the broker.
>
> Thanks,
> Magnus
>
>
> 2015-04-27 23:05 GMT+02:00 Gwen Shapira :
>
> > Batch failure is a bit meaningless, since in the same batch, some records
> > can succeed and others may fail.
> > To implement an error handling logic (usually different than retry, since
> > the producer has a configuration controlling retries), we recommend using
> > the callback option of Send().
> >
> > Gwen
> >
> > P.S
> > Awesome seeing you here, Roshan :)
> >
> > On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik 
> > wrote:
> >
> > > The important guarantee that is needed for a client producer thread is
> > > that it requires an indication of success/failure of the batch of
> events
> > > it pushed. Essentially it needs to retry producer.send() on that same
> > > batch in case of failure. My understanding is that flush will simply
> > flush
> > > data from all threads (correct me if I am wrong).
> > >
> > > -roshan
> > >
> > >
> > >
> > > On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
> > >
> > > >This sounds like flush:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> > > >od+to+the+producer+API
> > > >
> > > >which was recently implemented in trunk.
> > > >
> > > >Joel
> > > >
> > > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> > > >> Been evaluating the perf of old and new Produce APIs for reliable
> high
> > > >>volume streaming data movement. I do see one area of improvement that
> > > >>the new API could use for synchronous clients.
> > > >>
> > > >> AFAIKT, the new API does not support batched synchronous transfers.
> To
> > > >>do synchronous send, one needs to do a future.get() after every
> > > >>Producer.send(). I changed the new
> > > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of
> this
> > > >>mode of operation. May not be surprising that it much slower than the
> > > >>async mode... hard t push it beyond 4MB/s.
> > > >>
> > > >> The 0.8.1 Scala based producer API supported a batched sync mode via
> > > >>Producer.send( List ) . My measurements show that it
> was
> > > >>able to approach (and sometimes exceed) the old async speeds...
> 266MB/s
> > > >>
> > > >>
> > > >> Supporting this batched sync mode is very critical for streaming
> > > >>clients (such as flume for example) that need delivery guarantees.
> > > >>Although it can be done with Async mode, it requires additional book
> > > >>keeping as to which events are delivered and which ones are not. The
> > > >>programming model becomes much simpler with the batched sync mode.
> > > >>Client having to deal with one single future.get() helps performance
> > > >>greatly too as I noted.
> > > >>
> > > >> Wanted to propose adding this as an enhancement to the new Producer
> > API.
> > > >
> > >
> > >
> >
>


Re: New Producer API - batched sync mode support

2015-04-27 Thread Roshan Naik
@Gwen

 - A failure in delivery of one or more events in the batch (typical Flume
case) is considered a failure of the entire batch and the client
redelivers the entire batch.
 - If clients want more fine grained control, alternative option is to
indicate which events failed in the return value of  producer.send(list<>)


@Joel
  Fine grained tracking of status of individual events is quite painful in
contrast to simply blocking on every batch. Old style Batched-sync mode
has great advantages in terms of simplicity and performance.
  Imagine a simple use case of client simply reading a directory of log
files and splitting them into log messages/events and pushing them through
kafka. Becomes more complex when all this tracking data needs to be
persisted to accommodate for client restarts/crashes. Tracking a simple
current line number and file name is easy to programming with and persist
to accommodateŠ as opposed start/end position of each log message in the
file. 


-roshan





On 4/27/15 2:07 PM, "Joel Koshy"  wrote:

>As long as you retain the returned futures somewhere, you can always
>iterate over the futures after the flush completes and check for
>success/failure. Would that work for you?
>
>On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote:
>> The important guarantee that is needed for a client producer thread is
>> that it requires an indication of success/failure of the batch of events
>> it pushed. Essentially it needs to retry producer.send() on that same
>> batch in case of failure. My understanding is that flush will simply
>>flush
>> data from all threads (correct me if I am wrong).
>> 
>> -roshan
>> 
>> 
>> 
>> On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
>> 
>> >This sounds like flush:
>> 
>>>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+me
>>>th
>> >od+to+the+producer+API
>> >
>> >which was recently implemented in trunk.
>> >
>> >Joel
>> >
>> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
>> >> Been evaluating the perf of old and new Produce APIs for reliable
>>high
>> >>volume streaming data movement. I do see one area of improvement that
>> >>the new API could use for synchronous clients.
>> >> 
>> >> AFAIKT, the new API does not support batched synchronous transfers.
>>To
>> >>do synchronous send, one needs to do a future.get() after every
>> >>Producer.send(). I changed the new
>> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
>> >>mode of operation. May not be surprising that it much slower than the
>> >>async mode... hard t push it beyond 4MB/s.
>> >> 
>> >> The 0.8.1 Scala based producer API supported a batched sync mode via
>> >>Producer.send( List ) . My measurements show that it was
>> >>able to approach (and sometimes exceed) the old async speeds...
>>266MB/s
>> >> 
>> >> 
>> >> Supporting this batched sync mode is very critical for streaming
>> >>clients (such as flume for example) that need delivery guarantees.
>> >>Although it can be done with Async mode, it requires additional book
>> >>keeping as to which events are delivered and which ones are not. The
>> >>programming model becomes much simpler with the batched sync mode.
>> >>Client having to deal with one single future.get() helps performance
>> >>greatly too as I noted.
>> >> 
>> >> Wanted to propose adding this as an enhancement to the new Producer
>>API.
>> >
>> 
>



Re: New Producer API - batched sync mode support

2015-04-27 Thread Magnus Edenhill
Hi Gwen,

can you clarify: by batch do you mean the protocol MessageSet, or some java
client internal construct?
If the former I was under the impression that a produced MessageSet either
succeeds delivery or errors in its entirety on the broker.

Thanks,
Magnus


2015-04-27 23:05 GMT+02:00 Gwen Shapira :

> Batch failure is a bit meaningless, since in the same batch, some records
> can succeed and others may fail.
> To implement an error handling logic (usually different than retry, since
> the producer has a configuration controlling retries), we recommend using
> the callback option of Send().
>
> Gwen
>
> P.S
> Awesome seeing you here, Roshan :)
>
> On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik 
> wrote:
>
> > The important guarantee that is needed for a client producer thread is
> > that it requires an indication of success/failure of the batch of events
> > it pushed. Essentially it needs to retry producer.send() on that same
> > batch in case of failure. My understanding is that flush will simply
> flush
> > data from all threads (correct me if I am wrong).
> >
> > -roshan
> >
> >
> >
> > On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
> >
> > >This sounds like flush:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> > >od+to+the+producer+API
> > >
> > >which was recently implemented in trunk.
> > >
> > >Joel
> > >
> > >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> > >> Been evaluating the perf of old and new Produce APIs for reliable high
> > >>volume streaming data movement. I do see one area of improvement that
> > >>the new API could use for synchronous clients.
> > >>
> > >> AFAIKT, the new API does not support batched synchronous transfers. To
> > >>do synchronous send, one needs to do a future.get() after every
> > >>Producer.send(). I changed the new
> > >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> > >>mode of operation. May not be surprising that it much slower than the
> > >>async mode... hard t push it beyond 4MB/s.
> > >>
> > >> The 0.8.1 Scala based producer API supported a batched sync mode via
> > >>Producer.send( List ) . My measurements show that it was
> > >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> > >>
> > >>
> > >> Supporting this batched sync mode is very critical for streaming
> > >>clients (such as flume for example) that need delivery guarantees.
> > >>Although it can be done with Async mode, it requires additional book
> > >>keeping as to which events are delivered and which ones are not. The
> > >>programming model becomes much simpler with the batched sync mode.
> > >>Client having to deal with one single future.get() helps performance
> > >>greatly too as I noted.
> > >>
> > >> Wanted to propose adding this as an enhancement to the new Producer
> API.
> > >
> >
> >
>


Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy
As long as you retain the returned futures somewhere, you can always
iterate over the futures after the flush completes and check for
success/failure. Would that work for you?

On Mon, Apr 27, 2015 at 08:53:36PM +, Roshan Naik wrote:
> The important guarantee that is needed for a client producer thread is
> that it requires an indication of success/failure of the batch of events
> it pushed. Essentially it needs to retry producer.send() on that same
> batch in case of failure. My understanding is that flush will simply flush
> data from all threads (correct me if I am wrong).
> 
> -roshan
> 
> 
> 
> On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
> 
> >This sounds like flush:
> >https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> >od+to+the+producer+API
> >
> >which was recently implemented in trunk.
> >
> >Joel
> >
> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> >> Been evaluating the perf of old and new Produce APIs for reliable high
> >>volume streaming data movement. I do see one area of improvement that
> >>the new API could use for synchronous clients.
> >> 
> >> AFAIKT, the new API does not support batched synchronous transfers. To
> >>do synchronous send, one needs to do a future.get() after every
> >>Producer.send(). I changed the new
> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >>mode of operation. May not be surprising that it much slower than the
> >>async mode... hard t push it beyond 4MB/s.
> >> 
> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >>Producer.send( List ) . My measurements show that it was
> >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> >> 
> >> 
> >> Supporting this batched sync mode is very critical for streaming
> >>clients (such as flume for example) that need delivery guarantees.
> >>Although it can be done with Async mode, it requires additional book
> >>keeping as to which events are delivered and which ones are not. The
> >>programming model becomes much simpler with the batched sync mode.
> >>Client having to deal with one single future.get() helps performance
> >>greatly too as I noted.
> >> 
> >> Wanted to propose adding this as an enhancement to the new Producer API.
> >
> 



Re: New Producer API - batched sync mode support

2015-04-27 Thread Gwen Shapira
Batch failure is a bit meaningless, since in the same batch, some records
can succeed and others may fail.
To implement an error handling logic (usually different than retry, since
the producer has a configuration controlling retries), we recommend using
the callback option of Send().

Gwen

P.S
Awesome seeing you here, Roshan :)

On Mon, Apr 27, 2015 at 1:53 PM, Roshan Naik  wrote:

> The important guarantee that is needed for a client producer thread is
> that it requires an indication of success/failure of the batch of events
> it pushed. Essentially it needs to retry producer.send() on that same
> batch in case of failure. My understanding is that flush will simply flush
> data from all threads (correct me if I am wrong).
>
> -roshan
>
>
>
> On 4/27/15 1:36 PM, "Joel Koshy"  wrote:
>
> >This sounds like flush:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
> >od+to+the+producer+API
> >
> >which was recently implemented in trunk.
> >
> >Joel
> >
> >On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> >> Been evaluating the perf of old and new Produce APIs for reliable high
> >>volume streaming data movement. I do see one area of improvement that
> >>the new API could use for synchronous clients.
> >>
> >> AFAIKT, the new API does not support batched synchronous transfers. To
> >>do synchronous send, one needs to do a future.get() after every
> >>Producer.send(). I changed the new
> >>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
> >>mode of operation. May not be surprising that it much slower than the
> >>async mode... hard t push it beyond 4MB/s.
> >>
> >> The 0.8.1 Scala based producer API supported a batched sync mode via
> >>Producer.send( List ) . My measurements show that it was
> >>able to approach (and sometimes exceed) the old async speeds... 266MB/s
> >>
> >>
> >> Supporting this batched sync mode is very critical for streaming
> >>clients (such as flume for example) that need delivery guarantees.
> >>Although it can be done with Async mode, it requires additional book
> >>keeping as to which events are delivered and which ones are not. The
> >>programming model becomes much simpler with the batched sync mode.
> >>Client having to deal with one single future.get() helps performance
> >>greatly too as I noted.
> >>
> >> Wanted to propose adding this as an enhancement to the new Producer API.
> >
>
>


Re: New Producer API - batched sync mode support

2015-04-27 Thread Roshan Naik
The important guarantee that is needed for a client producer thread is
that it requires an indication of success/failure of the batch of events
it pushed. Essentially it needs to retry producer.send() on that same
batch in case of failure. My understanding is that flush will simply flush
data from all threads (correct me if I am wrong).

-roshan



On 4/27/15 1:36 PM, "Joel Koshy"  wrote:

>This sounds like flush:
>https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+meth
>od+to+the+producer+API
>
>which was recently implemented in trunk.
>
>Joel
>
>On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
>> Been evaluating the perf of old and new Produce APIs for reliable high
>>volume streaming data movement. I do see one area of improvement that
>>the new API could use for synchronous clients.
>> 
>> AFAIKT, the new API does not support batched synchronous transfers. To
>>do synchronous send, one needs to do a future.get() after every
>>Producer.send(). I changed the new
>>o.a.k.clients.tools.ProducerPerformance tool to asses the perf of this
>>mode of operation. May not be surprising that it much slower than the
>>async mode... hard t push it beyond 4MB/s.
>> 
>> The 0.8.1 Scala based producer API supported a batched sync mode via
>>Producer.send( List ) . My measurements show that it was
>>able to approach (and sometimes exceed) the old async speeds... 266MB/s
>> 
>> 
>> Supporting this batched sync mode is very critical for streaming
>>clients (such as flume for example) that need delivery guarantees.
>>Although it can be done with Async mode, it requires additional book
>>keeping as to which events are delivered and which ones are not. The
>>programming model becomes much simpler with the batched sync mode.
>>Client having to deal with one single future.get() helps performance
>>greatly too as I noted.
>> 
>> Wanted to propose adding this as an enhancement to the new Producer API.
>



Re: New Producer API - batched sync mode support

2015-04-27 Thread Joel Koshy
This sounds like flush:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

which was recently implemented in trunk.

Joel

On Mon, Apr 27, 2015 at 08:19:40PM +, Roshan Naik wrote:
> Been evaluating the perf of old and new Produce APIs for reliable high volume 
> streaming data movement. I do see one area of improvement that the new API 
> could use for synchronous clients.
> 
> AFAIKT, the new API does not support batched synchronous transfers. To do 
> synchronous send, one needs to do a future.get() after every Producer.send(). 
> I changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the 
> perf of this mode of operation. May not be surprising that it much slower 
> than the async mode... hard t push it beyond 4MB/s.
> 
> The 0.8.1 Scala based producer API supported a batched sync mode via 
> Producer.send( List ) . My measurements show that it was able 
> to approach (and sometimes exceed) the old async speeds... 266MB/s
> 
> 
> Supporting this batched sync mode is very critical for streaming clients 
> (such as flume for example) that need delivery guarantees. Although it can be 
> done with Async mode, it requires additional book keeping as to which events 
> are delivered and which ones are not. The programming model becomes much 
> simpler with the batched sync mode. Client having to deal with one single 
> future.get() helps performance greatly too as I noted.
> 
> Wanted to propose adding this as an enhancement to the new Producer API.



New Producer API - batched sync mode support

2015-04-27 Thread Roshan Naik
Been evaluating the perf of old and new Produce APIs for reliable high volume 
streaming data movement. I do see one area of improvement that the new API 
could use for synchronous clients.

AFAIKT, the new API does not support batched synchronous transfers. To do 
synchronous send, one needs to do a future.get() after every Producer.send(). I 
changed the new o.a.k.clients.tools.ProducerPerformance tool to asses the perf 
of this mode of operation. May not be surprising that it much slower than the 
async mode... hard t push it beyond 4MB/s.

The 0.8.1 Scala based producer API supported a batched sync mode via 
Producer.send( List ) . My measurements show that it was able to 
approach (and sometimes exceed) the old async speeds... 266MB/s


Supporting this batched sync mode is very critical for streaming clients (such 
as flume for example) that need delivery guarantees. Although it can be done 
with Async mode, it requires additional book keeping as to which events are 
delivered and which ones are not. The programming model becomes much simpler 
with the batched sync mode. Client having to deal with one single future.get() 
helps performance greatly too as I noted.

Wanted to propose adding this as an enhancement to the new Producer API.


Re: Batching with new Producer API

2015-02-26 Thread Akshat Aranya
Oh, that makes a lot more sense!  I assumed that the batch size was in
terms of number of messages, not the number of bytes because it was so
small.  What would be a reasonable value to use? Will 1-2 MB be too large
and bursty?

On Thu, Feb 26, 2015 at 10:07 AM, Harsha  wrote:

> Akshat,
>Produce.batch_size is in bytes and if your messages avg size is
>310 bytes and your current number of messages per batch is 46 you
>are getting close to the max batch size 16384. Did you try
>increasing the producer batch_size bytes?
> -Harsha
>
> On Thu, Feb 26, 2015, at 09:49 AM, Akshat Aranya wrote:
> > Hi,
> >
> > I am using the new Producer API in Kafka 0.8.2. I am writing messages to
> > Kafka that are ~310 bytes long with the same partition key to one single
> > .
> > I'm mostly using the default Producer config, which sets the max batch
> > size
> > to 16,384.  However, looking at the JMX stats on the broker side, I see
> > that I'm only getting an average batch size of 46.  I also tried
> > increasing
> > the linger.ms value to 100ms (default is 0), but that didn't help
> either.
> > Is there something else that I can tune that will improve write batching?
> >
> > Thanks,
> > Akshat
>


Re: Batching with new Producer API

2015-02-26 Thread Harsha
Akshat,
   Produce.batch_size is in bytes and if your messages avg size is
   310 bytes and your current number of messages per batch is 46 you
   are getting close to the max batch size 16384. Did you try
   increasing the producer batch_size bytes?
-Harsha

On Thu, Feb 26, 2015, at 09:49 AM, Akshat Aranya wrote:
> Hi,
> 
> I am using the new Producer API in Kafka 0.8.2. I am writing messages to
> Kafka that are ~310 bytes long with the same partition key to one single
> .
> I'm mostly using the default Producer config, which sets the max batch
> size
> to 16,384.  However, looking at the JMX stats on the broker side, I see
> that I'm only getting an average batch size of 46.  I also tried
> increasing
> the linger.ms value to 100ms (default is 0), but that didn't help either.
> Is there something else that I can tune that will improve write batching?
> 
> Thanks,
> Akshat


Batching with new Producer API

2015-02-26 Thread Akshat Aranya
Hi,

I am using the new Producer API in Kafka 0.8.2. I am writing messages to
Kafka that are ~310 bytes long with the same partition key to one single .
I'm mostly using the default Producer config, which sets the max batch size
to 16,384.  However, looking at the JMX stats on the broker side, I see
that I'm only getting an average batch size of 46.  I also tried increasing
the linger.ms value to 100ms (default is 0), but that didn't help either.
Is there something else that I can tune that will improve write batching?

Thanks,
Akshat


Re: new producer api and batched Futures....

2014-11-20 Thread Jason Rosenberg
I guess it would make the api less clean, but I can imagine a sendBatch
method, which returns a single Future that gets triggered only when all
messages in the batch were finished.  The callback info could then contain
info about the success/exceptions encountered by each sub-group of
messages.  And the callback could even be called multiple times, once for
each sub-batch sent.   It gets complicated to think about it, but it would
be fewer Future objects created and less async contention/waiting, etc.

I'll try it out and see

Jason

On Thu, Nov 20, 2014 at 7:56 PM, Jay Kreps  wrote:

> Internally it works as you describe, there is only one CountDownLatch per
> batch sent, each of the futures is just a wrapper around that.
>
> It is true that if you accumulate thousands of futures in a list that may
> be a fair number of objects you are retaining, and there will be some work
> involved in checking them all. If you are sure they are all going to the
> same partition you can actually wait on the last future since sends are
> ordered within a partition. So when the final send completes the prior
> sends should also have completed.
>
> Either way if you see a case where the new producer isn't as fast as the
> old producer let us know.
>
> -Jay
>
>
>
> On Thu, Nov 20, 2014 at 4:24 PM, Jason Rosenberg  wrote:
>
> > I've been looking at the new producer api with anticipation, but have not
> > fired it up yet.
> >
> > One question I have, is it looks like there's no longer a 'batch' send
> mode
> > (and I get that this is all now handled internally, e.g. you send
> > individual messages, that then get collated and batched up and sent out).
> >
> > What I'm wondering, is whether there's added overhead in the producer
> (and
> > the client code) having to manage all the Future return Objects from all
> > the individual messages sent?  If I'm sending 100K messages/second, etc.,
> > that seems like a lot of async Future Objects that have to be tickled,
> and
> > waited for, etc.  Does not this cause some overhead?
> >
> > If I send a bunch of messages and then store all the Future's in a list,
> > and then wait for all of them, it seems like a lot of thread contention.
> > On the other hand, if I send a batch of messages, that are likely all to
> > get sent as a single batch over the wire (cuz they are all going to the
> > same partition), wouldn't there be some benefit in only having to wait
> for
> > a single Future Object for the batch?
> >
> > Jason
> >
>


Re: new producer api and batched Futures....

2014-11-20 Thread Jay Kreps
Internally it works as you describe, there is only one CountDownLatch per
batch sent, each of the futures is just a wrapper around that.

It is true that if you accumulate thousands of futures in a list that may
be a fair number of objects you are retaining, and there will be some work
involved in checking them all. If you are sure they are all going to the
same partition you can actually wait on the last future since sends are
ordered within a partition. So when the final send completes the prior
sends should also have completed.

Either way if you see a case where the new producer isn't as fast as the
old producer let us know.

-Jay



On Thu, Nov 20, 2014 at 4:24 PM, Jason Rosenberg  wrote:

> I've been looking at the new producer api with anticipation, but have not
> fired it up yet.
>
> One question I have, is it looks like there's no longer a 'batch' send mode
> (and I get that this is all now handled internally, e.g. you send
> individual messages, that then get collated and batched up and sent out).
>
> What I'm wondering, is whether there's added overhead in the producer (and
> the client code) having to manage all the Future return Objects from all
> the individual messages sent?  If I'm sending 100K messages/second, etc.,
> that seems like a lot of async Future Objects that have to be tickled, and
> waited for, etc.  Does not this cause some overhead?
>
> If I send a bunch of messages and then store all the Future's in a list,
> and then wait for all of them, it seems like a lot of thread contention.
> On the other hand, if I send a batch of messages, that are likely all to
> get sent as a single batch over the wire (cuz they are all going to the
> same partition), wouldn't there be some benefit in only having to wait for
> a single Future Object for the batch?
>
> Jason
>


new producer api and batched Futures....

2014-11-20 Thread Jason Rosenberg
I've been looking at the new producer api with anticipation, but have not
fired it up yet.

One question I have, is it looks like there's no longer a 'batch' send mode
(and I get that this is all now handled internally, e.g. you send
individual messages, that then get collated and batched up and sent out).

What I'm wondering, is whether there's added overhead in the producer (and
the client code) having to manage all the Future return Objects from all
the individual messages sent?  If I'm sending 100K messages/second, etc.,
that seems like a lot of async Future Objects that have to be tickled, and
waited for, etc.  Does not this cause some overhead?

If I send a bunch of messages and then store all the Future's in a list,
and then wait for all of them, it seems like a lot of thread contention.
On the other hand, if I send a batch of messages, that are likely all to
get sent as a single batch over the wire (cuz they are all going to the
same partition), wouldn't there be some benefit in only having to wait for
a single Future Object for the batch?

Jason


Re: how to know kafka producer api status

2014-05-16 Thread Yonghui Zhao
Seems email archive doesn't work now in
http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/browser

And also I sent email to users-subscr...@kafka.apache.orgto  subscribe
this group, it doesn't work either.


2014-05-09 16:00 GMT+08:00 Yonghui Zhao :

>
> If l use java producer api in sync mode.
>
>   public void send(kafka.producer.KeyedMessage message) { /* compiled
> code */ }
>
> How to know whether a send process is successful or failed?
>
> For example if the kafka broker disk is not accessible , will it throw
> exceptions?
>
>


Re: how to know kafka producer api status

2014-05-16 Thread Guozhang Wang
Hi Yonghui,

If you set producer.type = sync, then the send() call will not return until
it have received the ack from the broker, and if the response contains any
error code it will re-try send until all retries exhausted, and then will
throw an exception.

Guozhang


On Fri, May 9, 2014 at 4:18 AM, Yonghui Zhao  wrote:

> Seems email archive doesn't work now in
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/browser
>
> And also I sent email to users-subscr...@kafka.apache.orgto  subscribe
> this group, it doesn't work either.
>
>
> 2014-05-09 16:00 GMT+08:00 Yonghui Zhao :
>
> >
> > If l use java producer api in sync mode.
> >
> >   public void send(kafka.producer.KeyedMessage message) { /*
> compiled
> > code */ }
> >
> > How to know whether a send process is successful or failed?
> >
> > For example if the kafka broker disk is not accessible , will it throw
> > exceptions?
> >
> >
>



-- 
-- Guozhang


Re: how to know kafka producer api status

2014-05-16 Thread Timothy Chen
It typically throws a exception in the end of the sync producer cannot
deliver your message.

In the case where there is a IOException or similiar exceptions that
the Broker cannot deal with, I believe it will try to return
UnknownError response which will then throw in the producer.

In cases where it receives error codes where the producer can recover
from (ie: NotLeaderForPartition), it simply retries up to the
configured max retries.

Tim

On Fri, May 9, 2014 at 1:00 AM, Yonghui Zhao  wrote:
> If l use java producer api in sync mode.
>
>   public void send(kafka.producer.KeyedMessage message) { /* compiled
> code */ }
>
> How to know whether a send process is successful or failed?
>
> For example if the kafka broker disk is not accessible , will it throw
> exceptions?


Re: how to know kafka producer api status

2014-05-16 Thread Jun Rao
Yes, in sync mode, if send() fails, an exception will be thrown.

Thanks,

Jun


On Fri, May 9, 2014 at 1:00 AM, Yonghui Zhao  wrote:

> If l use java producer api in sync mode.
>
>   public void send(kafka.producer.KeyedMessage message) { /* compiled
> code */ }
>
> How to know whether a send process is successful or failed?
>
> For example if the kafka broker disk is not accessible , will it throw
> exceptions?
>


how to know kafka producer api status

2014-05-15 Thread Yonghui Zhao
If l use java producer api in sync mode.

  public void send(kafka.producer.KeyedMessage message) { /* compiled
code */ }

How to know whether a send process is successful or failed?

For example if the kafka broker disk is not accessible , will it throw
exceptions?


Re: Re: why kafka producer api use cpu so high?

2014-05-11 Thread Eric Sammer
If a process is CPU bound (which this producer almost certainly will be),
it's going to consume as much CPU as it can to do what what it does. The
test is flawed. Because there's no end state, the while loop is just going
to burn CPU and, because it's singly threaded, it will take a single core.
A better test is to find out a rough number of events per second your
process needs to produce and write the testing accordingly. That will tell
you, when producing ~50MB/sec worth of events, this is how much the
producer will chew up.

The other thing worth pointing out is that sending a single event at a time
comes with a fair bit of overhead which, in turn, naturally drives up CPU
time. If you use the list form of send() you're going to be amortize the
cost of the RPC and other internal bits leading to more efficient use of
system resources. Again, it may still burn a full core because what you're
doing is CPU bound, but it will do more during that time.


On Sun, May 11, 2014 at 1:04 AM,  wrote:

> I use snappy for compression.
> but even without compression, this procedure also use 50% one core cpu.
>
> when using snappy ,this procedure use 100% one core cpu.
>
>
>
>
>
> From: Timothy Chen
> Date: 2014-05-11 15:53
> To: users@kafka.apache.org
> Subject: Re: why kafka producer api use cpu so high?
> What is your compression configuration for your producer?
>
> One of the biggest CPU source for the producer is doing compression
> and also checksuming.
>
> Tim
>
> On Sun, May 11, 2014 at 12:24 AM,   wrote:
> > I write a very simple code , like this :
> > public class LogProducer {
> >
> > private Producer inner;
> > public LogProducer() throws Exception{
> > Properties properties = new Properties();
> >
> properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
> > ProducerConfig config = new ProducerConfig(properties);
> > inner = new Producer(config);
> > }
> >
> >
> > public void send(String topicName,String message) {
> > if(topicName == null || message == null){
> > return;
> > }
> > KeyedMessage km = new KeyedMessage String>(topicName,message);
> > inner.send(km);
> > }
> > public void close(){
> > inner.close();
> > }
> >
> > /**
> >  * @param args
> >  */
> > public static void main(String[] args) {
> > LogProducer producer = null;
> > try{
> > producer = new LogProducer();
> > int i=0;
> > while(true){
> > producer.send("test", "this is a
> sample");
> > }
> > }catch(Exception e){
> > e.printStackTrace();
> > }finally{
> > if(producer != null){
> > producer.close();
> > }
> > }
> >
> > }
> >
> > }
> > ~~
> > and the producer.properties like this:
> > metadata.broker.list=127.0.0.1:9092
> > producer.type=async
> > serializer.class=kafka.serializer.StringEncoder
> > batch.num.messages=200
> > compression.codec=snappy
> >
> > I run this procedure on linux, which is 4 core cpu , 16GB memory.
> > I find this procedure using one core cpu totally , this is "top" command
> ouput:
> >
> >
> > [root@localhost ~]# top
> > top - 13:51:09 up 5 days, 13:27,  3 users,  load average: 0.96, 0.48,
> 0.35
> > Tasks: 367 total,   3 running, 364 sleeping,   0 stopped,   0 zombie
> > Cpu0  :  7.0%us,  0.3%sy,  0.0%ni, 92.0%id,  0.7%wa,  0.0%hi,  0.0%si,
>  0.0%st
> > Cpu1  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> > Cpu2  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> > Cpu3  : 99.7%us,  0.3%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,
>  0.0%st
> > Mem:  16307528k total,  9398376k used,  6909152k free,   249952k buffers
> > Swap:  8224760k total,0k used,  8224760k free,  6071348k cached
> >
> > why producer api use  cpu so high ? or maybe I make something wrong ?
> >
> > by the way , the kafka version 0.8.0  .
>



-- 
E. Sammer
CTO - ScalingData


Re: Re: why kafka producer api use cpu so high?

2014-05-11 Thread
I use snappy for compression.
but even without compression, this procedure also use 50% one core cpu.

when using snappy ,this procedure use 100% one core cpu.





From: Timothy Chen
Date: 2014-05-11 15:53
To: users@kafka.apache.org
Subject: Re: why kafka producer api use cpu so high?
What is your compression configuration for your producer?

One of the biggest CPU source for the producer is doing compression
and also checksuming.

Tim

On Sun, May 11, 2014 at 12:24 AM,   wrote:
> I write a very simple code , like this :
> public class LogProducer {
>
> private Producer inner;
> public LogProducer() throws Exception{
> Properties properties = new Properties();
> 
> properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
> ProducerConfig config = new ProducerConfig(properties);
> inner = new Producer(config);
> }
>
>
> public void send(String topicName,String message) {
> if(topicName == null || message == null){
> return;
> }
> KeyedMessage km = new KeyedMessage String>(topicName,message);
> inner.send(km);
> }
> public void close(){
> inner.close();
> }
>
> /**
>  * @param args
>  */
> public static void main(String[] args) {
> LogProducer producer = null;
> try{
> producer = new LogProducer();
> int i=0;
> while(true){
> producer.send("test", "this is a 
> sample");
> }
> }catch(Exception e){
> e.printStackTrace();
> }finally{
> if(producer != null){
> producer.close();
> }
> }
>
> }
>
> }
> ~~
> and the producer.properties like this:
> metadata.broker.list=127.0.0.1:9092
> producer.type=async
> serializer.class=kafka.serializer.StringEncoder
> batch.num.messages=200
> compression.codec=snappy
>
> I run this procedure on linux, which is 4 core cpu , 16GB memory.
> I find this procedure using one core cpu totally , this is "top" command 
> ouput:
>
>
> [root@localhost ~]# top
> top - 13:51:09 up 5 days, 13:27,  3 users,  load average: 0.96, 0.48, 0.35
> Tasks: 367 total,   3 running, 364 sleeping,   0 stopped,   0 zombie
> Cpu0  :  7.0%us,  0.3%sy,  0.0%ni, 92.0%id,  0.7%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu1  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu2  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu3  : 99.7%us,  0.3%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Mem:  16307528k total,  9398376k used,  6909152k free,   249952k buffers
> Swap:  8224760k total,0k used,  8224760k free,  6071348k cached
>
> why producer api use  cpu so high ? or maybe I make something wrong ?
>
> by the way , the kafka version 0.8.0  .

Re: Re: why kafka producer api use cpu so high?

2014-05-11 Thread
because my app can generate 50MB log every second and one record of log is 
about 1KB , so I must send this log as fast as machine can.

this is very difficult,   on one hand  I want to send log as fast as possible,  
on the other hand  I want kafka producer api use cpu as low as possible.

if kafka api using cpu so high , it will impact my app.  

so can kafka solve this problem ? send 50MB log to kafka server every second 
,and using low cpu.





From: cac...@gmail.com
Date: 2014-05-11 16:52
To: users
Subject: Re: why kafka producer api use cpu so high?
This code says to send this message infinitely as fast as the machine can
thereby consuming as much of one CPU as possible. You may want to consider
an alternate test, perhaps one that records the number of messages sent in
a given time period.

> > public static void main(String[] args) {
> > LogProducer producer = null;
> > try{
> > producer = new LogProducer();
> > int i=0;
> > while(true){
> > producer.send("test", "this is a
> sample");
> > }
> > }catch(Exception e){
> > e.printStackTrace();
> > }finally{
> > if(producer != null){
> > producer.close();
> > }
> > }
> >
> > }
> >
> > }
>
>

Re: why kafka producer api use cpu so high?

2014-05-11 Thread cac...@gmail.com
This code says to send this message infinitely as fast as the machine can
thereby consuming as much of one CPU as possible. You may want to consider
an alternate test, perhaps one that records the number of messages sent in
a given time period.

> > public static void main(String[] args) {
> > LogProducer producer = null;
> > try{
> > producer = new LogProducer();
> > int i=0;
> > while(true){
> > producer.send("test", "this is a
> sample");
> > }
> > }catch(Exception e){
> > e.printStackTrace();
> > }finally{
> > if(producer != null){
> > producer.close();
> > }
> > }
> >
> > }
> >
> > }
>
>


Re: why kafka producer api use cpu so high?

2014-05-11 Thread Timothy Chen
What is your compression configuration for your producer?

One of the biggest CPU source for the producer is doing compression
and also checksuming.

Tim

On Sun, May 11, 2014 at 12:24 AM,   wrote:
> I write a very simple code , like this :
> public class LogProducer {
>
> private Producer inner;
> public LogProducer() throws Exception{
> Properties properties = new Properties();
> 
> properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
> ProducerConfig config = new ProducerConfig(properties);
> inner = new Producer(config);
> }
>
>
> public void send(String topicName,String message) {
> if(topicName == null || message == null){
> return;
> }
> KeyedMessage km = new KeyedMessage String>(topicName,message);
> inner.send(km);
> }
> public void close(){
> inner.close();
> }
>
> /**
>  * @param args
>  */
> public static void main(String[] args) {
> LogProducer producer = null;
> try{
> producer = new LogProducer();
> int i=0;
> while(true){
> producer.send("test", "this is a 
> sample");
> }
> }catch(Exception e){
> e.printStackTrace();
> }finally{
> if(producer != null){
> producer.close();
> }
> }
>
> }
>
> }
> ~~
> and the producer.properties like this:
> metadata.broker.list=127.0.0.1:9092
> producer.type=async
> serializer.class=kafka.serializer.StringEncoder
> batch.num.messages=200
> compression.codec=snappy
>
> I run this procedure on linux, which is 4 core cpu , 16GB memory.
> I find this procedure using one core cpu totally , this is "top" command 
> ouput:
>
>
> [root@localhost ~]# top
> top - 13:51:09 up 5 days, 13:27,  3 users,  load average: 0.96, 0.48, 0.35
> Tasks: 367 total,   3 running, 364 sleeping,   0 stopped,   0 zombie
> Cpu0  :  7.0%us,  0.3%sy,  0.0%ni, 92.0%id,  0.7%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu1  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu2  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Cpu3  : 99.7%us,  0.3%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
> Mem:  16307528k total,  9398376k used,  6909152k free,   249952k buffers
> Swap:  8224760k total,0k used,  8224760k free,  6071348k cached
>
> why producer api use  cpu so high ? or maybe I make something wrong ?
>
> by the way , the kafka version 0.8.0  .


why kafka producer api use cpu so high?

2014-05-11 Thread
I write a very simple code , like this :
public class LogProducer {  
 
private Producer inner;  
public LogProducer() throws Exception{  
Properties properties = new Properties();  

properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
ProducerConfig config = new ProducerConfig(properties);  
inner = new Producer(config);  
}  
 
  
public void send(String topicName,String message) {  
if(topicName == null || message == null){  
return;  
}  
KeyedMessage km = new KeyedMessage(topicName,message);  
inner.send(km);  
}  
public void close(){  
inner.close();  
}  
  
/** 
 * @param args 
 */  
public static void main(String[] args) {  
LogProducer producer = null;  
try{  
producer = new LogProducer();  
int i=0;  
while(true){  
producer.send("test", "this is a 
sample");
  
}  
}catch(Exception e){  
e.printStackTrace();  
}finally{  
if(producer != null){  
producer.close();  
}  
}  
 
}  
 
}  
~~
and the producer.properties like this:
metadata.broker.list=127.0.0.1:9092
producer.type=async
serializer.class=kafka.serializer.StringEncoder
batch.num.messages=200
compression.codec=snappy

I run this procedure on linux, which is 4 core cpu , 16GB memory.
I find this procedure using one core cpu totally , this is "top" command ouput:


[root@localhost ~]# top
top - 13:51:09 up 5 days, 13:27,  3 users,  load average: 0.96, 0.48, 0.35
Tasks: 367 total,   3 running, 364 sleeping,   0 stopped,   0 zombie
Cpu0  :  7.0%us,  0.3%sy,  0.0%ni, 92.0%id,  0.7%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu1  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu2  :  5.0%us,  0.0%sy,  0.0%ni, 95.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Cpu3  : 99.7%us,  0.3%sy,  0.0%ni,  0.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:  16307528k total,  9398376k used,  6909152k free,   249952k buffers
Swap:  8224760k total,0k used,  8224760k free,  6071348k cached

why producer api use  cpu so high ? or maybe I make something wrong ?

by the way , the kafka version 0.8.0  .

Re: 0.8.1 Java Producer API Callbacks

2014-05-02 Thread Jay Kreps
This summary is correct. If you are just starting development now it is
probably reasonable to start with the new producer. We would certainly
appreciate any feedback on it.

My recommendation would be to build the producer off trunk as there were a
few bug fixes since 0.8.1.x that are worth getting.

-Jay


On Fri, May 2, 2014 at 9:49 AM, Neha Narkhede wrote:

> Hi Christian,
>
> As Jun mentioned, the new producer is marked beta since it is new. By 0.9
> or even sooner, we'd expect it to be deployed at LinkedIn and a few other
> companies in a stable state. The APIs, guarantees and features will not
> change by 0.9, just the stability.
>
> Thanks,
> Neha
>
>
> On Fri, May 2, 2014 at 7:50 AM, Jun Rao  wrote:
>
> > It's beta mostly because it's new.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com 
> > wrote:
> >
> > > Thanks, that's quite helpful. According to this post,
> > > http://blog.empathybox.com/ , it looks like it will be beta then which
> > > seems good enough. Assuming that the beta designation is correct, is
> that
> > > because it won't have as many features/the same flexibility as it's
> > > expected to by 0.9?
> > > Christian
> > >
> > >
> > > On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede  > > >wrote:
> > >
> > > > The javadoc of the new producer is here for now -
> > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > >
> > > > Thanks,
> > > > Neha
> > > >
> > > >
> > > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao  wrote:
> > > >
> > > > > The new producer (that supports callbacks) is in trunk. It will be
> > > > released
> > > > > in 0.8.2. You can look at the java doc of KafkaProducer for the
> api.
> > > > >
> > > > > Thanks
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar 
> > > wrote:
> > > > >
> > > > > > On 05/01/2014 07:22 PM, Christian Csar wrote:
> > > > > > > I'm looking at using the java producer api for 0.8.1 and I'm
> > > slightly
> > > > > > > confused by this passage from section 4.4 of
> > > > > > > https://kafka.apache.org/documentation.html#theproducer
> > > > > > > "Note that as of Kafka 0.8.1 the async producer does not have a
> > > > > > > callback, which could be used to register handlers to catch
> send
> > > > > errors.
> > > > > > > Adding such callback functionality is proposed for Kafka 0.9,
> see
> > > > > > > [Proposed Producer
> > > > > > > API](
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> > > > > )
> > > > > > ."
> > > > > > >
> > > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1
> appears
> > to
> > > > > have
> > > > > > > public Future send(ProducerRecord record,
> > Callback
> > > > > > > callback) which looks like the mentioned callback.
> > > > > > >
> > > > > > > How do the callbacks with the async producer? Is it as
> described
> > in
> > > > the
> > > > > > > comment on the send method (see
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > > > > > > for reference)?
> > > > > > >
> > > > > > > Looking around it seems plausible the language in the
> > documentation
> > > > > > > might refer to a separate sort of callback that existed in 0.7
> > but
> > > > not
> > > > > > > 0.8. In our use case we have something useful to do if we can
> > > detect
> > > > > > > messages failing to be sent.
> > > > > > >
> > > > > > > Christian
> > > > > > >
> > > > > >
> > > > > > It appears that I was looking at the Java client rather than the
> > > Scala
> > > > > > java api referenced by the documentation
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
> > > > > >
> > > > > > Are both of these currently suited for use from java and still
> > > > > > supported? Given the support for callbacks in the event of
> failure
> > I
> > > am
> > > > > > inclined to use the Java one despite the currently limited
> support
> > > for
> > > > > > specifying partitioners (though it supports specifying the
> > partition)
> > > > or
> > > > > > encoders.
> > > > > >
> > > > > > Any guidance on this would be appreciated.
> > > > > >
> > > > > > Christian
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-02 Thread Neha Narkhede
Hi Christian,

As Jun mentioned, the new producer is marked beta since it is new. By 0.9
or even sooner, we'd expect it to be deployed at LinkedIn and a few other
companies in a stable state. The APIs, guarantees and features will not
change by 0.9, just the stability.

Thanks,
Neha


On Fri, May 2, 2014 at 7:50 AM, Jun Rao  wrote:

> It's beta mostly because it's new.
>
> Thanks,
>
> Jun
>
>
> On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com 
> wrote:
>
> > Thanks, that's quite helpful. According to this post,
> > http://blog.empathybox.com/ , it looks like it will be beta then which
> > seems good enough. Assuming that the beta designation is correct, is that
> > because it won't have as many features/the same flexibility as it's
> > expected to by 0.9?
> > Christian
> >
> >
> > On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede  > >wrote:
> >
> > > The javadoc of the new producer is here for now -
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > >
> > > Thanks,
> > > Neha
> > >
> > >
> > > On Thu, May 1, 2014 at 9:16 PM, Jun Rao  wrote:
> > >
> > > > The new producer (that supports callbacks) is in trunk. It will be
> > > released
> > > > in 0.8.2. You can look at the java doc of KafkaProducer for the api.
> > > >
> > > > Thanks
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar 
> > wrote:
> > > >
> > > > > On 05/01/2014 07:22 PM, Christian Csar wrote:
> > > > > > I'm looking at using the java producer api for 0.8.1 and I'm
> > slightly
> > > > > > confused by this passage from section 4.4 of
> > > > > > https://kafka.apache.org/documentation.html#theproducer
> > > > > > "Note that as of Kafka 0.8.1 the async producer does not have a
> > > > > > callback, which could be used to register handlers to catch send
> > > > errors.
> > > > > > Adding such callback functionality is proposed for Kafka 0.9, see
> > > > > > [Proposed Producer
> > > > > > API](
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> > > > )
> > > > > ."
> > > > > >
> > > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears
> to
> > > > have
> > > > > > public Future send(ProducerRecord record,
> Callback
> > > > > > callback) which looks like the mentioned callback.
> > > > > >
> > > > > > How do the callbacks with the async producer? Is it as described
> in
> > > the
> > > > > > comment on the send method (see
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > > > > > for reference)?
> > > > > >
> > > > > > Looking around it seems plausible the language in the
> documentation
> > > > > > might refer to a separate sort of callback that existed in 0.7
> but
> > > not
> > > > > > 0.8. In our use case we have something useful to do if we can
> > detect
> > > > > > messages failing to be sent.
> > > > > >
> > > > > > Christian
> > > > > >
> > > > >
> > > > > It appears that I was looking at the Java client rather than the
> > Scala
> > > > > java api referenced by the documentation
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
> > > > >
> > > > > Are both of these currently suited for use from java and still
> > > > > supported? Given the support for callbacks in the event of failure
> I
> > am
> > > > > inclined to use the Java one despite the currently limited support
> > for
> > > > > specifying partitioners (though it supports specifying the
> partition)
> > > or
> > > > > encoders.
> > > > >
> > > > > Any guidance on this would be appreciated.
> > > > >
> > > > > Christian
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-02 Thread Jun Rao
It's beta mostly because it's new.

Thanks,

Jun


On Thu, May 1, 2014 at 10:25 PM, cac...@gmail.com  wrote:

> Thanks, that's quite helpful. According to this post,
> http://blog.empathybox.com/ , it looks like it will be beta then which
> seems good enough. Assuming that the beta designation is correct, is that
> because it won't have as many features/the same flexibility as it's
> expected to by 0.9?
> Christian
>
>
> On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede  >wrote:
>
> > The javadoc of the new producer is here for now -
> >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, May 1, 2014 at 9:16 PM, Jun Rao  wrote:
> >
> > > The new producer (that supports callbacks) is in trunk. It will be
> > released
> > > in 0.8.2. You can look at the java doc of KafkaProducer for the api.
> > >
> > > Thanks
> > >
> > > Jun
> > >
> > >
> > > On Thu, May 1, 2014 at 8:43 PM, Christian Csar 
> wrote:
> > >
> > > > On 05/01/2014 07:22 PM, Christian Csar wrote:
> > > > > I'm looking at using the java producer api for 0.8.1 and I'm
> slightly
> > > > > confused by this passage from section 4.4 of
> > > > > https://kafka.apache.org/documentation.html#theproducer
> > > > > "Note that as of Kafka 0.8.1 the async producer does not have a
> > > > > callback, which could be used to register handlers to catch send
> > > errors.
> > > > > Adding such callback functionality is proposed for Kafka 0.9, see
> > > > > [Proposed Producer
> > > > > API](
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> > > )
> > > > ."
> > > > >
> > > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to
> > > have
> > > > > public Future send(ProducerRecord record, Callback
> > > > > callback) which looks like the mentioned callback.
> > > > >
> > > > > How do the callbacks with the async producer? Is it as described in
> > the
> > > > > comment on the send method (see
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > > > > for reference)?
> > > > >
> > > > > Looking around it seems plausible the language in the documentation
> > > > > might refer to a separate sort of callback that existed in 0.7 but
> > not
> > > > > 0.8. In our use case we have something useful to do if we can
> detect
> > > > > messages failing to be sent.
> > > > >
> > > > > Christian
> > > > >
> > > >
> > > > It appears that I was looking at the Java client rather than the
> Scala
> > > > java api referenced by the documentation
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
> > > >
> > > > Are both of these currently suited for use from java and still
> > > > supported? Given the support for callbacks in the event of failure I
> am
> > > > inclined to use the Java one despite the currently limited support
> for
> > > > specifying partitioners (though it supports specifying the partition)
> > or
> > > > encoders.
> > > >
> > > > Any guidance on this would be appreciated.
> > > >
> > > > Christian
> > > >
> > > >
> > >
> >
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-01 Thread cac...@gmail.com
Thanks, that's quite helpful. According to this post,
http://blog.empathybox.com/ , it looks like it will be beta then which
seems good enough. Assuming that the beta designation is correct, is that
because it won't have as many features/the same flexibility as it's
expected to by 0.9?
Christian


On Thu, May 1, 2014 at 9:19 PM, Neha Narkhede wrote:

> The javadoc of the new producer is here for now -
>
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
>
> Thanks,
> Neha
>
>
> On Thu, May 1, 2014 at 9:16 PM, Jun Rao  wrote:
>
> > The new producer (that supports callbacks) is in trunk. It will be
> released
> > in 0.8.2. You can look at the java doc of KafkaProducer for the api.
> >
> > Thanks
> >
> > Jun
> >
> >
> > On Thu, May 1, 2014 at 8:43 PM, Christian Csar  wrote:
> >
> > > On 05/01/2014 07:22 PM, Christian Csar wrote:
> > > > I'm looking at using the java producer api for 0.8.1 and I'm slightly
> > > > confused by this passage from section 4.4 of
> > > > https://kafka.apache.org/documentation.html#theproducer
> > > > "Note that as of Kafka 0.8.1 the async producer does not have a
> > > > callback, which could be used to register handlers to catch send
> > errors.
> > > > Adding such callback functionality is proposed for Kafka 0.9, see
> > > > [Proposed Producer
> > > > API](
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> > )
> > > ."
> > > >
> > > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to
> > have
> > > > public Future send(ProducerRecord record, Callback
> > > > callback) which looks like the mentioned callback.
> > > >
> > > > How do the callbacks with the async producer? Is it as described in
> the
> > > > comment on the send method (see
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > > > for reference)?
> > > >
> > > > Looking around it seems plausible the language in the documentation
> > > > might refer to a separate sort of callback that existed in 0.7 but
> not
> > > > 0.8. In our use case we have something useful to do if we can detect
> > > > messages failing to be sent.
> > > >
> > > > Christian
> > > >
> > >
> > > It appears that I was looking at the Java client rather than the Scala
> > > java api referenced by the documentation
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
> > >
> > > Are both of these currently suited for use from java and still
> > > supported? Given the support for callbacks in the event of failure I am
> > > inclined to use the Java one despite the currently limited support for
> > > specifying partitioners (though it supports specifying the partition)
> or
> > > encoders.
> > >
> > > Any guidance on this would be appreciated.
> > >
> > > Christian
> > >
> > >
> >
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-01 Thread Neha Narkhede
The javadoc of the new producer is here for now -
http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html

Thanks,
Neha


On Thu, May 1, 2014 at 9:16 PM, Jun Rao  wrote:

> The new producer (that supports callbacks) is in trunk. It will be released
> in 0.8.2. You can look at the java doc of KafkaProducer for the api.
>
> Thanks
>
> Jun
>
>
> On Thu, May 1, 2014 at 8:43 PM, Christian Csar  wrote:
>
> > On 05/01/2014 07:22 PM, Christian Csar wrote:
> > > I'm looking at using the java producer api for 0.8.1 and I'm slightly
> > > confused by this passage from section 4.4 of
> > > https://kafka.apache.org/documentation.html#theproducer
> > > "Note that as of Kafka 0.8.1 the async producer does not have a
> > > callback, which could be used to register handlers to catch send
> errors.
> > > Adding such callback functionality is proposed for Kafka 0.9, see
> > > [Proposed Producer
> > > API](
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI
> )
> > ."
> > >
> > > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to
> have
> > > public Future send(ProducerRecord record, Callback
> > > callback) which looks like the mentioned callback.
> > >
> > > How do the callbacks with the async producer? Is it as described in the
> > > comment on the send method (see
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > > for reference)?
> > >
> > > Looking around it seems plausible the language in the documentation
> > > might refer to a separate sort of callback that existed in 0.7 but not
> > > 0.8. In our use case we have something useful to do if we can detect
> > > messages failing to be sent.
> > >
> > > Christian
> > >
> >
> > It appears that I was looking at the Java client rather than the Scala
> > java api referenced by the documentation
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
> >
> > Are both of these currently suited for use from java and still
> > supported? Given the support for callbacks in the event of failure I am
> > inclined to use the Java one despite the currently limited support for
> > specifying partitioners (though it supports specifying the partition) or
> > encoders.
> >
> > Any guidance on this would be appreciated.
> >
> > Christian
> >
> >
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-01 Thread Jun Rao
The new producer (that supports callbacks) is in trunk. It will be released
in 0.8.2. You can look at the java doc of KafkaProducer for the api.

Thanks

Jun


On Thu, May 1, 2014 at 8:43 PM, Christian Csar  wrote:

> On 05/01/2014 07:22 PM, Christian Csar wrote:
> > I'm looking at using the java producer api for 0.8.1 and I'm slightly
> > confused by this passage from section 4.4 of
> > https://kafka.apache.org/documentation.html#theproducer
> > "Note that as of Kafka 0.8.1 the async producer does not have a
> > callback, which could be used to register handlers to catch send errors.
> > Adding such callback functionality is proposed for Kafka 0.9, see
> > [Proposed Producer
> > API](
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)
> ."
> >
> > org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have
> > public Future send(ProducerRecord record, Callback
> > callback) which looks like the mentioned callback.
> >
> > How do the callbacks with the async producer? Is it as described in the
> > comment on the send method (see
> >
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> > for reference)?
> >
> > Looking around it seems plausible the language in the documentation
> > might refer to a separate sort of callback that existed in 0.7 but not
> > 0.8. In our use case we have something useful to do if we can detect
> > messages failing to be sent.
> >
> > Christian
> >
>
> It appears that I was looking at the Java client rather than the Scala
> java api referenced by the documentation
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala
>
> Are both of these currently suited for use from java and still
> supported? Given the support for callbacks in the event of failure I am
> inclined to use the Java one despite the currently limited support for
> specifying partitioners (though it supports specifying the partition) or
> encoders.
>
> Any guidance on this would be appreciated.
>
> Christian
>
>


Re: 0.8.1 Java Producer API Callbacks

2014-05-01 Thread Christian Csar
On 05/01/2014 07:22 PM, Christian Csar wrote:
> I'm looking at using the java producer api for 0.8.1 and I'm slightly
> confused by this passage from section 4.4 of
> https://kafka.apache.org/documentation.html#theproducer
> "Note that as of Kafka 0.8.1 the async producer does not have a
> callback, which could be used to register handlers to catch send errors.
> Adding such callback functionality is proposed for Kafka 0.9, see
> [Proposed Producer
> API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)."
> 
> org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have
> public Future send(ProducerRecord record, Callback
> callback) which looks like the mentioned callback.
> 
> How do the callbacks with the async producer? Is it as described in the
> comment on the send method (see
> https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
> for reference)?
> 
> Looking around it seems plausible the language in the documentation
> might refer to a separate sort of callback that existed in 0.7 but not
> 0.8. In our use case we have something useful to do if we can detect
> messages failing to be sent.
> 
> Christian
> 

It appears that I was looking at the Java client rather than the Scala
java api referenced by the documentation
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/javaapi/producer/Producer.scala

Are both of these currently suited for use from java and still
supported? Given the support for callbacks in the event of failure I am
inclined to use the Java one despite the currently limited support for
specifying partitioners (though it supports specifying the partition) or
encoders.

Any guidance on this would be appreciated.

Christian



signature.asc
Description: OpenPGP digital signature


0.8.1 Java Producer API Callbacks

2014-05-01 Thread Christian Csar
I'm looking at using the java producer api for 0.8.1 and I'm slightly
confused by this passage from section 4.4 of
https://kafka.apache.org/documentation.html#theproducer
"Note that as of Kafka 0.8.1 the async producer does not have a
callback, which could be used to register handlers to catch send errors.
Adding such callback functionality is proposed for Kafka 0.9, see
[Proposed Producer
API](https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ProposedProducerAPI)."

org.apache.kafka.clients.producer.KafkaProducer in 0.8.1 appears to have
public Future send(ProducerRecord record, Callback
callback) which looks like the mentioned callback.

How do the callbacks with the async producer? Is it as described in the
comment on the send method (see
https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L151
for reference)?

Looking around it seems plausible the language in the documentation
might refer to a separate sort of callback that existed in 0.7 but not
0.8. In our use case we have something useful to do if we can detect
messages failing to be sent.

Christian



signature.asc
Description: OpenPGP digital signature


Re: Pattern for using kafka producer API

2014-02-10 Thread Jun Rao
If you are only worried about throughput, you can use one producer in async
mode. You can tune the batch size and time for better performance.

Thanks,

Jun


On Sun, Feb 9, 2014 at 11:42 PM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> What is the most appropriate design for using kafka producer from
> performance view point.I had few in my mind.
>
> 1.Since single kafka producer object have synchronization; using single
> producer object from multiple thread might not be efficient.so one way
> would be to use multiple kafka producer from inside same thread.
>
> 2.Have multiple thread each having it's own instance of producer.This has
> thread overheads if kafka internally using the same semantics.
>
> It would be great if someone can comment on these approaches or suggest
> widely used one.
> P.S. im using 0.8.0 and mostly concerned with async producer.
>
> Thanks And Regards,
> Pushkar
>


Pattern for using kafka producer API

2014-02-09 Thread pushkar priyadarshi
What is the most appropriate design for using kafka producer from
performance view point.I had few in my mind.

1.Since single kafka producer object have synchronization; using single
producer object from multiple thread might not be efficient.so one way
would be to use multiple kafka producer from inside same thread.

2.Have multiple thread each having it's own instance of producer.This has
thread overheads if kafka internally using the same semantics.

It would be great if someone can comment on these approaches or suggest
widely used one.
P.S. im using 0.8.0 and mostly concerned with async producer.

Thanks And Regards,
Pushkar


Re: Questions about producer API

2013-10-31 Thread Roger Hoover
Thank you, Neha.  I mainly wanted to understand if it was because of
historic reasons or some fundamental reason.  I think managing
configuration will be simpler if both sides use ZooKeeper for discovery.

Great to hear about the client rewrite project going on.  Thanks to you and
the other contributors/commiters for this great software.

Cheers,

Roger


On Wed, Oct 30, 2013 at 6:00 PM, Neha Narkhede wrote:

> Agree that it is somewhat awkward to use zookeeper for broker discovery on
> consumer, but a broker list on the producer. There were a couple of
> discussions on the mailing list suggesting using zookeeper on the producer,
> at least for discovering the brokers for the first time. However, we are
> starting on the Client Rewrite project which is targeted for 0.9. That is
> something we can consider changing on Kafka 0.9. If there is sufficient
> interest, we can look making the zookeeper config change on the producer
> soon. But that is something to discuss on a JIRA.
>
> Thanks,
> Neha
>
>
> On Wed, Oct 30, 2013 at 9:53 AM, Roger Hoover  >wrote:
>
> > Hi,
> >
> > I'm still getting started with Kafka and was curious why there is an
> > asymmetry between the producer and consumer APIs.  Why does the producer
> > config take a list of brokers where as the consumer config takes a list
> of
> > brokers?
> >
> > Thanks,
> >
> > Roger
> >
>


Re: Questions about producer API

2013-10-30 Thread Neha Narkhede
Agree that it is somewhat awkward to use zookeeper for broker discovery on
consumer, but a broker list on the producer. There were a couple of
discussions on the mailing list suggesting using zookeeper on the producer,
at least for discovering the brokers for the first time. However, we are
starting on the Client Rewrite project which is targeted for 0.9. That is
something we can consider changing on Kafka 0.9. If there is sufficient
interest, we can look making the zookeeper config change on the producer
soon. But that is something to discuss on a JIRA.

Thanks,
Neha


On Wed, Oct 30, 2013 at 9:53 AM, Roger Hoover wrote:

> Hi,
>
> I'm still getting started with Kafka and was curious why there is an
> asymmetry between the producer and consumer APIs.  Why does the producer
> config take a list of brokers where as the consumer config takes a list of
> brokers?
>
> Thanks,
>
> Roger
>


Questions about producer API

2013-10-30 Thread Roger Hoover
Hi,

I'm still getting started with Kafka and was curious why there is an
asymmetry between the producer and consumer APIs.  Why does the producer
config take a list of brokers where as the consumer config takes a list of
brokers?

Thanks,

Roger


RE: producer API thread safety

2013-10-04 Thread Yu, Libo
Great. Thanks.

Regards,

Libo


-Original Message-
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: Friday, October 04, 2013 12:27 PM
To: users@kafka.apache.org
Subject: Re: producer API thread safety

The send() is thread safe, so the short answer would be yes.


On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo  wrote:

> Hi team,
>
> Is it possible to use a single producer with more than one threads? I 
> am not sure If its send() is thread safe.
>
> Regards,
>
> Libo
>
>


--
-- Guozhang


Re: producer API thread safety

2013-10-04 Thread Guozhang Wang
The send() is thread safe, so the short answer would be yes.


On Fri, Oct 4, 2013 at 9:14 AM, Yu, Libo  wrote:

> Hi team,
>
> Is it possible to use a single producer with more than one threads? I am
> not sure
> If its send() is thread safe.
>
> Regards,
>
> Libo
>
>


-- 
-- Guozhang


producer API thread safety

2013-10-04 Thread Yu, Libo
Hi team,

Is it possible to use a single producer with more than one threads? I am not 
sure
If its send() is thread safe.

Regards,

Libo