So for each message that I need to send asynchronously I have to create a
new instance of callback and hold on to the message?
This looks nice in theory but in case of few thousands of request/sec this
could use up too much extra memory and push too much to garbage collector,
especially in case connection breaks for a few seconds and all this piles
up as result of retry logic.

I guess I can pool callbacks and do something like setMessage() on callback
but this looks like an attempt to workaround limitations of the API.
I'd prefer to create one instance of callback per app or per thread and
reuse it. Since kafka producer already have messages in the batch and knows
the batch that failed, it can pass the message to the onError() callback.

Am I over-thinking this?


----------
Andrey Yegorov


On Fri, Jan 24, 2014 at 1:15 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> If I understand your use case I think usage would be something like
>
>   producer.send(message, new Callback() {
>     public void onCompletion(RecordSend send) {
>        if(send.hasError())
>           log.write(message);
>     }
>   });
>
> Reasonable?
>
> In other words you can include references to any variables you like in the
> callback. We could provide the message for you but that would require us to
> hang on to the message object for the duration of the call which has memory
> implications so I think it is better for people to only do this if they
> want to use it.
>
> -Jay
>
>
> On Fri, Jan 24, 2014 at 1:05 PM, Andrey Yegorov <andrey.yego...@gmail.com
> >wrote:
>
> > I love the callback in send() but I do not see how it helps in case of an
> > error.
> >
> > Imagine the usecase: I want to write messages to the log so I can replay
> > them to kafka later in case if async send failed.
> > From a brief look at the API I see that I'll get back RecordSend object
> > (which is not true already - it was not send in case of error.) From that
> > object I can get some info about the error and offset. How d I get the
> > original message back so I can write it to the log? Can you please
> provide
> > an example?
> >
> >
> >
> > ----------
> > Andrey Yegorov
> >
> >
> > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > As mentioned in a previous email we are working on a re-implementation
> of
> > > the producer. I would like to use this email thread to discuss the
> > details
> > > of the public API and the configuration. I would love for us to be
> > > incredibly picky about this public api now so it is as good as possible
> > and
> > > we don't need to break it in the future.
> > >
> > > The best way to get a feel for the API is actually to take a look at
> the
> > > javadoc, my hope is to get the api docs good enough so that it is
> > > self-explanatory:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > >
> > > Please take a look at this API and give me any thoughts you may have!
> > >
> > > It may also be reasonable to take a look at the configs:
> > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > >
> > > The actual code is posted here:
> > > https://issues.apache.org/jira/browse/KAFKA-1227
> > >
> > > A few questions or comments to kick things off:
> > > 1. We need to make a decision on whether serialization of the user's
> key
> > > and value should be done by the user (with our api just taking byte[])
> or
> > > if we should take an object and allow the user to configure a
> Serializer
> > > class which we instantiate via reflection. We take the later approach
> in
> > > the current producer, and I have carried this through to this
> prototype.
> > > The tradeoff I see is this: taking byte[] is actually simpler, the user
> > can
> > > directly do whatever serialization they like. The complication is
> > actually
> > > partitioning. Currently partitioning is done by a similar plug-in api
> > > (Partitioner) which the user can implement and configure to override
> how
> > > partitions are assigned. If we take byte[] as input then we have no
> > access
> > > to the original object and partitioning MUST be done on the byte[].
> This
> > is
> > > fine for hash partitioning. However for various types of semantic
> > > partitioning (range partitioning, or whatever) you would want access to
> > the
> > > original object. In the current approach a producer who wishes to send
> > > byte[] they have serialized in their own code can configure the
> > > BytesSerialization we supply which is just a "no op" serialization.
> > > 2. We should obsess over naming and make sure each of the class names
> are
> > > good.
> > > 3. Jun has already pointed out that we need to include the topic and
> > > partition in the response, which is absolutely right. I haven't done
> that
> > > yet but that definitely needs to be there.
> > > 4. Currently RecordSend.await will throw an exception if the request
> > > failed. The intention here is that producer.send(message).await()
> exactly
> > > simulates a synchronous call. Guozhang has noted that this is a little
> > > annoying since the user must then catch exceptions. However if we
> remove
> > > this then if the user doesn't check for errors they won't know one has
> > > occurred, which I predict will be a common mistake.
> > > 5. Perhaps there is more we could do to make the async callbacks and
> > future
> > > we give back intuitive and easy to program against?
> > >
> > > Some background info on implementation:
> > >
> > > At a high level the primary difference in this producer is that it
> > removes
> > > the distinction between the "sync" and "async" producer. Effectively
> all
> > > requests are sent asynchronously but always return a future response
> > object
> > > that gives the offset as well as any error that may have occurred when
> > the
> > > request is complete. The batching that is done in the async producer
> only
> > > today is done whenever possible now. This means that the sync producer,
> > > under load, can get performance as good as the async producer
> > (preliminary
> > > results show the producer getting 1m messages/sec). This works similar
> to
> > > group commit in databases but with respect to the actual network
> > > transmission--any messages that arrive while a send is in progress are
> > > batched together. It is also possible to encourage batching even under
> > low
> > > load to save server resources by introducing a delay on the send to
> allow
> > > more messages to accumulate; this is done using the linger.ms config
> > (this
> > > is similar to Nagle's algorithm in TCP).
> > >
> > > This producer does all network communication asynchronously and in
> > parallel
> > > to all servers so the performance penalty for acks=-1 and waiting on
> > > replication should be much reduced. I haven't done much benchmarking on
> > > this yet, though.
> > >
> > > The high level design is described a little here, though this is now a
> > > little out of date:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > >
> > > -Jay
> > >
> >
>

Reply via email to