Andrey,

I think this should perform okay. We already create a number of objects per
message sent, one more shouldn't have too much performance impact if it is
just thousands per second.

-Jay


On Fri, Jan 24, 2014 at 2:28 PM, Andrey Yegorov <andrey.yego...@gmail.com>wrote:

> So for each message that I need to send asynchronously I have to create a
> new instance of callback and hold on to the message?
> This looks nice in theory but in case of few thousands of request/sec this
> could use up too much extra memory and push too much to garbage collector,
> especially in case connection breaks for a few seconds and all this piles
> up as result of retry logic.
>
> I guess I can pool callbacks and do something like setMessage() on callback
> but this looks like an attempt to workaround limitations of the API.
> I'd prefer to create one instance of callback per app or per thread and
> reuse it. Since kafka producer already have messages in the batch and knows
> the batch that failed, it can pass the message to the onError() callback.
>
> Am I over-thinking this?
>
>
> ----------
> Andrey Yegorov
>
>
> On Fri, Jan 24, 2014 at 1:15 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > If I understand your use case I think usage would be something like
> >
> >   producer.send(message, new Callback() {
> >     public void onCompletion(RecordSend send) {
> >        if(send.hasError())
> >           log.write(message);
> >     }
> >   });
> >
> > Reasonable?
> >
> > In other words you can include references to any variables you like in
> the
> > callback. We could provide the message for you but that would require us
> to
> > hang on to the message object for the duration of the call which has
> memory
> > implications so I think it is better for people to only do this if they
> > want to use it.
> >
> > -Jay
> >
> >
> > On Fri, Jan 24, 2014 at 1:05 PM, Andrey Yegorov <
> andrey.yego...@gmail.com
> > >wrote:
> >
> > > I love the callback in send() but I do not see how it helps in case of
> an
> > > error.
> > >
> > > Imagine the usecase: I want to write messages to the log so I can
> replay
> > > them to kafka later in case if async send failed.
> > > From a brief look at the API I see that I'll get back RecordSend object
> > > (which is not true already - it was not send in case of error.) From
> that
> > > object I can get some info about the error and offset. How d I get the
> > > original message back so I can write it to the log? Can you please
> > provide
> > > an example?
> > >
> > >
> > >
> > > ----------
> > > Andrey Yegorov
> > >
> > >
> > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > As mentioned in a previous email we are working on a
> re-implementation
> > of
> > > > the producer. I would like to use this email thread to discuss the
> > > details
> > > > of the public API and the configuration. I would love for us to be
> > > > incredibly picky about this public api now so it is as good as
> possible
> > > and
> > > > we don't need to break it in the future.
> > > >
> > > > The best way to get a feel for the API is actually to take a look at
> > the
> > > > javadoc, my hope is to get the api docs good enough so that it is
> > > > self-explanatory:
> > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > >
> > > > Please take a look at this API and give me any thoughts you may have!
> > > >
> > > > It may also be reasonable to take a look at the configs:
> > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > > >
> > > > The actual code is posted here:
> > > > https://issues.apache.org/jira/browse/KAFKA-1227
> > > >
> > > > A few questions or comments to kick things off:
> > > > 1. We need to make a decision on whether serialization of the user's
> > key
> > > > and value should be done by the user (with our api just taking
> byte[])
> > or
> > > > if we should take an object and allow the user to configure a
> > Serializer
> > > > class which we instantiate via reflection. We take the later approach
> > in
> > > > the current producer, and I have carried this through to this
> > prototype.
> > > > The tradeoff I see is this: taking byte[] is actually simpler, the
> user
> > > can
> > > > directly do whatever serialization they like. The complication is
> > > actually
> > > > partitioning. Currently partitioning is done by a similar plug-in api
> > > > (Partitioner) which the user can implement and configure to override
> > how
> > > > partitions are assigned. If we take byte[] as input then we have no
> > > access
> > > > to the original object and partitioning MUST be done on the byte[].
> > This
> > > is
> > > > fine for hash partitioning. However for various types of semantic
> > > > partitioning (range partitioning, or whatever) you would want access
> to
> > > the
> > > > original object. In the current approach a producer who wishes to
> send
> > > > byte[] they have serialized in their own code can configure the
> > > > BytesSerialization we supply which is just a "no op" serialization.
> > > > 2. We should obsess over naming and make sure each of the class names
> > are
> > > > good.
> > > > 3. Jun has already pointed out that we need to include the topic and
> > > > partition in the response, which is absolutely right. I haven't done
> > that
> > > > yet but that definitely needs to be there.
> > > > 4. Currently RecordSend.await will throw an exception if the request
> > > > failed. The intention here is that producer.send(message).await()
> > exactly
> > > > simulates a synchronous call. Guozhang has noted that this is a
> little
> > > > annoying since the user must then catch exceptions. However if we
> > remove
> > > > this then if the user doesn't check for errors they won't know one
> has
> > > > occurred, which I predict will be a common mistake.
> > > > 5. Perhaps there is more we could do to make the async callbacks and
> > > future
> > > > we give back intuitive and easy to program against?
> > > >
> > > > Some background info on implementation:
> > > >
> > > > At a high level the primary difference in this producer is that it
> > > removes
> > > > the distinction between the "sync" and "async" producer. Effectively
> > all
> > > > requests are sent asynchronously but always return a future response
> > > object
> > > > that gives the offset as well as any error that may have occurred
> when
> > > the
> > > > request is complete. The batching that is done in the async producer
> > only
> > > > today is done whenever possible now. This means that the sync
> producer,
> > > > under load, can get performance as good as the async producer
> > > (preliminary
> > > > results show the producer getting 1m messages/sec). This works
> similar
> > to
> > > > group commit in databases but with respect to the actual network
> > > > transmission--any messages that arrive while a send is in progress
> are
> > > > batched together. It is also possible to encourage batching even
> under
> > > low
> > > > load to save server resources by introducing a delay on the send to
> > allow
> > > > more messages to accumulate; this is done using the linger.ms config
> > > (this
> > > > is similar to Nagle's algorithm in TCP).
> > > >
> > > > This producer does all network communication asynchronously and in
> > > parallel
> > > > to all servers so the performance penalty for acks=-1 and waiting on
> > > > replication should be much reduced. I haven't done much benchmarking
> on
> > > > this yet, though.
> > > >
> > > > The high level design is described a little here, though this is now
> a
> > > > little out of date:
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > >
> > > > -Jay
> > > >
> > >
> >
>

Reply via email to