How about the following use case:

Just before the producer actually sends the payload to kakfa, could an
event be exposed that would allow one to loop through the messages and
potentially delete some of them?

Example:

Say you have 100 messages, but before you send these messages to kakfa, you
can easily aggregate many of these messages to reduce the message count.
 If there are messages that store counts, you could aggregate these into a
single message and then send to kafka.

Thoughts?



On Wed, Feb 5, 2014 at 2:03 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> It might. I considered this but ended up going this way. Now that we have
> changed partitionKey=>partition it almost works. The difference is the
> consumer gets an offset too which the producer doesn't have.
>
> One thing I think this points to is the value of getting the consumer java
> api worked out even in the absence of an implementation just so we can
> write some fake code that uses both and kind of see how it feels.
>
> -Jay
>
>
> On Wed, Feb 5, 2014 at 10:23 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Currently, the user will send ProducerRecords using the new producer. The
> > expectation will be that you get the same thing as output from the
> > consumer. Since ProduceRecord is a holder for topic, partition, key and
> > value, does it make sense to rename it to just Record? So, the
> send/receive
> > APIs would look like the following -
> >
> > producer.send(Record record);
> > List<Record> poll();
> >
> > Thoughts?
> >
> >
> > On Sun, Feb 2, 2014 at 4:12 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > I think the most common motivate of having a customized partitioner is
> to
> > > make sure some messages always go to the same partition, but people may
> > > seldom want to know about which partition exactly they go to. If that
> is
> > > true, why not just assign the same byte array as partition key with the
> > > default hash based partitioning in option 1.A? But again, that is based
> > on
> > > my presumption that very few users would want to really specify the
> > > partition id.
> > >
> > >
> > >
> > > On Fri, Jan 31, 2014 at 2:44 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> > >
> > > > Hey Tom,
> > > >
> > > > Agreed, there is definitely nothing that prevents our including
> > > partitioner
> > > > implementations, but it does get a little less seamless.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Fri, Jan 31, 2014 at 2:35 PM, Tom Brown <tombrow...@gmail.com>
> > wrote:
> > > >
> > > > > Regarding partitioning APIs, I don't think there is not a common
> > subset
> > > > of
> > > > > information that is required for all strategies. Instead of
> modifying
> > > the
> > > > > core API to easily support all of the various partitioning
> > strategies,
> > > > > offer the most common ones as libraries they can build into their
> own
> > > > data
> > > > > pipeline, just like serialization. The core API would simply
> accept a
> > > > > partition index. You could include one default strategy (random)
> that
> > > > only
> > > > > applies if they set "-1" for the partition index.
> > > > >
> > > > > That way, each partitioning strategy could have its own API that
> > makes
> > > > > sense for it. For example, a round-robin partitioner only needs one
> > > > method:
> > > > > "nextPartition()", while a hash-based one needs
> > > > "getPartitionFor(byte[])".
> > > > >
> > > > > For those who actually need a pluggable strategy, a superset of the
> > API
> > > > > could be codified into an interface (perhaps the existing
> partitioner
> > > > > interface), but it would still have to be used from outside of the
> > core
> > > > > API.
> > > > >
> > > > > This design would make the core API less confusing (when do I use a
> > > > > partiton key instead of a partition index, does the key overwrite
> the
> > > > > index, can the key be null, etc...?) while still providing the
> > > > flexibility
> > > > > you want.
> > > > >
> > > > > --Tom
> > > > >
> > > > > On Fri, Jan 31, 2014 at 12:07 PM, Jay Kreps <jay.kr...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Oliver,
> > > > > >
> > > > > > Yeah that was my original plan--allow the registration of
> multiple
> > > > > > callbacks on the future. But there is some additional
> > implementation
> > > > > > complexity because then you need more synchronization variables
> to
> > > > ensure
> > > > > > the callback gets executed even if the request has completed at
> the
> > > > time
> > > > > > the callback is registered. This also makes it unpredictable the
> > > order
> > > > of
> > > > > > callback execution--I want to be able to guarantee that for a
> > > > particular
> > > > > > partition callbacks for lower offset messages happen before
> > callbacks
> > > > for
> > > > > > higher offset messages so that if you set a highwater mark or
> > > something
> > > > > it
> > > > > > is easy to reason about. This has the added benefit that
> callbacks
> > > > > execute
> > > > > > in the I/O thread ALWAYS instead of it being non-deterministic
> > which
> > > > is a
> > > > > > little confusing.
> > > > > >
> > > > > > I thought a single callback is sufficient since you can always
> > > include
> > > > > > multiple actions in that callback, and I think that case is rare
> > > > anyway.
> > > > > >
> > > > > > I did think about the possibility of adding a thread pool for
> > > handling
> > > > > the
> > > > > > callbacks. But there are a lot of possible configurations for
> such
> > a
> > > > > thread
> > > > > > pool and a simplistic approach would no longer guarantee in-order
> > > > > > processing of callbacks (you would need to hash execution over
> > > threads
> > > > by
> > > > > > partition id). I think by just exposing the simple method that
> > > executes
> > > > > in
> > > > > > the I/O thread you can easily implement the pooled execution
> using
> > > the
> > > > > > therad pooling mechanism of your choice by just having the
> callback
> > > use
> > > > > an
> > > > > > executor to run the action (i.e. make an AsyncCallback that
> takes a
> > > > > > threadpool and a Runnable or something like that). This gives the
> > > user
> > > > > full
> > > > > > control over the executor (there are lots of details around
> thread
> > > > re-use
> > > > > > in executors, thread factories, etc and trying to expose configs
> > for
> > > > > every
> > > > > > variation will be a pain). This also makes it totally transparent
> > how
> > > > it
> > > > > > works; that is if we did expose all kinds of thread pool configs
> > you
> > > > > would
> > > > > > still probably end up reading our code to figure out exactly what
> > > they
> > > > > all
> > > > > > did.
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 31, 2014 at 9:39 AM, Oliver Dain <
> > > od...@3cinteractive.com
> > > > > > >wrote:
> > > > > >
> > > > > > > Hmmm.. I should read the docs more carefully before I open my
> big
> > > > > mouth:
> > > > > > I
> > > > > > > just noticed the KafkaProducer#send overload that takes a
> > callback.
> > > > > That
> > > > > > > definitely helps address my concern though I think the API
> would
> > be
> > > > > > > cleaner if there was only one variant that returned a future
> and
> > > you
> > > > > > could
> > > > > > > register the callback with the future. This is not nearly as
> > > > important
> > > > > as
> > > > > > > I'd thought given the ability to register a callback - just a
> > > > > preference.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On 1/31/14, 9:33 AM, "Oliver Dain" <od...@3cinteractive.com>
> > > wrote:
> > > > > > >
> > > > > > > >Hey all,
> > > > > > > >
> > > > > > > >I¹m excited about having a new Producer API, and I really like
> > the
> > > > > idea
> > > > > > of
> > > > > > > >removing the distinction between a synchronous and
> asynchronous
> > > > > > producer.
> > > > > > > >The one comment I have about the current API is that it¹s hard
> > to
> > > > > write
> > > > > > > >truly asynchronous code with the type of future returned by
> the
> > > send
> > > > > > > >method. The issue is that send returns a RecordSend and
> there¹s
> > no
> > > > way
> > > > > > to
> > > > > > > >register a callback with that object. It is therefore
> necessary
> > to
> > > > > poll
> > > > > > > >the object periodically to see if the send has completed. So
> if
> > > you
> > > > > > have n
> > > > > > > >send calls outstanding you have to check n RecordSend objects
> > > which
> > > > is
> > > > > > > >slow. In general this tends to lead to people using one thread
> > per
> > > > > send
> > > > > > > >call and then calling RecordSend#await which removes much of
> the
> > > > > benefit
> > > > > > > >of an async API.
> > > > > > > >
> > > > > > > >I think it¹s much easier to write truly asynchronous code if
> the
> > > > > > returned
> > > > > > > >future allows you to register a callback. That way, instead of
> > > > polling
> > > > > > you
> > > > > > > >can simply wait for the callback to be called. A good example
> of
> > > the
> > > > > > kind
> > > > > > > >of thing I¹m thinking is the ListenableFuture class in the
> Guava
> > > > > > > >libraries:
> > > > > > > >
> > > > > > > >
> > > > > >
> > > >
> > https://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained
> > > > > > > >
> > > > > > > >
> > > > > > > >HTH,
> > > > > > > >Oliver
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>

Reply via email to