Andrey, I think this should perform okay. We already create a number of objects per message sent, one more shouldn't have too much performance impact if it is just thousands per second.
-Jay On Fri, Jan 24, 2014 at 2:28 PM, Andrey Yegorov <andrey.yego...@gmail.com>wrote: > So for each message that I need to send asynchronously I have to create a > new instance of callback and hold on to the message? > This looks nice in theory but in case of few thousands of request/sec this > could use up too much extra memory and push too much to garbage collector, > especially in case connection breaks for a few seconds and all this piles > up as result of retry logic. > > I guess I can pool callbacks and do something like setMessage() on callback > but this looks like an attempt to workaround limitations of the API. > I'd prefer to create one instance of callback per app or per thread and > reuse it. Since kafka producer already have messages in the batch and knows > the batch that failed, it can pass the message to the onError() callback. > > Am I over-thinking this? > > > ---------- > Andrey Yegorov > > > On Fri, Jan 24, 2014 at 1:15 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > If I understand your use case I think usage would be something like > > > > producer.send(message, new Callback() { > > public void onCompletion(RecordSend send) { > > if(send.hasError()) > > log.write(message); > > } > > }); > > > > Reasonable? > > > > In other words you can include references to any variables you like in > the > > callback. We could provide the message for you but that would require us > to > > hang on to the message object for the duration of the call which has > memory > > implications so I think it is better for people to only do this if they > > want to use it. > > > > -Jay > > > > > > On Fri, Jan 24, 2014 at 1:05 PM, Andrey Yegorov < > andrey.yego...@gmail.com > > >wrote: > > > > > I love the callback in send() but I do not see how it helps in case of > an > > > error. > > > > > > Imagine the usecase: I want to write messages to the log so I can > replay > > > them to kafka later in case if async send failed. > > > From a brief look at the API I see that I'll get back RecordSend object > > > (which is not true already - it was not send in case of error.) From > that > > > object I can get some info about the error and offset. How d I get the > > > original message back so I can write it to the log? Can you please > > provide > > > an example? > > > > > > > > > > > > ---------- > > > Andrey Yegorov > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > > > As mentioned in a previous email we are working on a > re-implementation > > of > > > > the producer. I would like to use this email thread to discuss the > > > details > > > > of the public API and the configuration. I would love for us to be > > > > incredibly picky about this public api now so it is as good as > possible > > > and > > > > we don't need to break it in the future. > > > > > > > > The best way to get a feel for the API is actually to take a look at > > the > > > > javadoc, my hope is to get the api docs good enough so that it is > > > > self-explanatory: > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > > > Please take a look at this API and give me any thoughts you may have! > > > > > > > > It may also be reasonable to take a look at the configs: > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > > > > > > > The actual code is posted here: > > > > https://issues.apache.org/jira/browse/KAFKA-1227 > > > > > > > > A few questions or comments to kick things off: > > > > 1. We need to make a decision on whether serialization of the user's > > key > > > > and value should be done by the user (with our api just taking > byte[]) > > or > > > > if we should take an object and allow the user to configure a > > Serializer > > > > class which we instantiate via reflection. We take the later approach > > in > > > > the current producer, and I have carried this through to this > > prototype. > > > > The tradeoff I see is this: taking byte[] is actually simpler, the > user > > > can > > > > directly do whatever serialization they like. The complication is > > > actually > > > > partitioning. Currently partitioning is done by a similar plug-in api > > > > (Partitioner) which the user can implement and configure to override > > how > > > > partitions are assigned. If we take byte[] as input then we have no > > > access > > > > to the original object and partitioning MUST be done on the byte[]. > > This > > > is > > > > fine for hash partitioning. However for various types of semantic > > > > partitioning (range partitioning, or whatever) you would want access > to > > > the > > > > original object. In the current approach a producer who wishes to > send > > > > byte[] they have serialized in their own code can configure the > > > > BytesSerialization we supply which is just a "no op" serialization. > > > > 2. We should obsess over naming and make sure each of the class names > > are > > > > good. > > > > 3. Jun has already pointed out that we need to include the topic and > > > > partition in the response, which is absolutely right. I haven't done > > that > > > > yet but that definitely needs to be there. > > > > 4. Currently RecordSend.await will throw an exception if the request > > > > failed. The intention here is that producer.send(message).await() > > exactly > > > > simulates a synchronous call. Guozhang has noted that this is a > little > > > > annoying since the user must then catch exceptions. However if we > > remove > > > > this then if the user doesn't check for errors they won't know one > has > > > > occurred, which I predict will be a common mistake. > > > > 5. Perhaps there is more we could do to make the async callbacks and > > > future > > > > we give back intuitive and easy to program against? > > > > > > > > Some background info on implementation: > > > > > > > > At a high level the primary difference in this producer is that it > > > removes > > > > the distinction between the "sync" and "async" producer. Effectively > > all > > > > requests are sent asynchronously but always return a future response > > > object > > > > that gives the offset as well as any error that may have occurred > when > > > the > > > > request is complete. The batching that is done in the async producer > > only > > > > today is done whenever possible now. This means that the sync > producer, > > > > under load, can get performance as good as the async producer > > > (preliminary > > > > results show the producer getting 1m messages/sec). This works > similar > > to > > > > group commit in databases but with respect to the actual network > > > > transmission--any messages that arrive while a send is in progress > are > > > > batched together. It is also possible to encourage batching even > under > > > low > > > > load to save server resources by introducing a delay on the send to > > allow > > > > more messages to accumulate; this is done using the linger.ms config > > > (this > > > > is similar to Nagle's algorithm in TCP). > > > > > > > > This producer does all network communication asynchronously and in > > > parallel > > > > to all servers so the performance penalty for acks=-1 and waiting on > > > > replication should be much reduced. I haven't done much benchmarking > on > > > > this yet, though. > > > > > > > > The high level design is described a little here, though this is now > a > > > > little out of date: > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > > > > > > -Jay > > > > > > > > > >