+1 ListenableFuture: If this works similar to Deferreds in Twisted Python
or Promised IO in Javascript, I think this is a great pattern for
decoupling your callback logic from the place where the Future is
generated.  You can register as many callbacks as you like, each in the
appropriate layer of the code and have each observer get notified when the
promised i/o is complete without any of them knowing about each other.


On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hey Ross,
>
> - ListenableFuture: Interesting. That would be an alternative to the direct
> callback support we provide. There could be pros to this, let me think
> about it.
> - We could provide layering, but I feel that the serialization is such a
> small thing we should just make a decision and chose one, it doesn't seem
> to me to justify a whole public facing layer.
> - Yes, this is fairly esoteric, essentially I think it is fairly similar to
> databases like DynamoDB that allow you to specify two partition keys (I
> think DynamoDB does this...). The reasoning is that in fact there are
> several things you can use the key field for: (1) to compute the partition
> to store the data in, (2) as a unique identifier to deduplicate that
> partition's records within a log. These two things are almost always the
> same, but occationally may differ when you want to group data in a more
> sophisticated way then just a hash of the primary key but still retain the
> proper primary key for delivery to the consumer and log compaction.
>
> -Jay
>
>
> On Tue, Jan 28, 2014 at 3:24 AM, Ross Black <ross.w.bl...@gmail.com>
> wrote:
>
> > Hi Jay,
> >
> > - Just to add some more info/confusion about possibly using Future ...
> >   If Kafka uses a JDK future, it plays nicely with other frameworks as
> > well.
> >   Google Guava has a ListenableFuture that allows callback handling to be
> > added via the returned future, and allows the callbacks to be passed off
> to
> > a specified executor.
> >
> >
> >
> http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html
> >   The JDK future can easily be converted to a listenable future.
> >
> > - On the question of byte[] vs Object, could this be solved by layering
> the
> > API?  eg. a raw producer (use byte[] and specify the partition number)
> and
> > a normal producer (use generic object and specify a Partitioner)?
> >
> > - I am confused by the keys in ProducerRecord and Partitioner.  What is
> the
> > usage for both a key and a partition key? (I am not yet using 0.8)
> >
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 28 January 2014 05:00, Xavier Stevens <xav...@gaikai.com> wrote:
> >
> > > AutoCloseable would be nice for us as most of our code is using Java 7
> at
> > > this point.
> > >
> > > I like Dropwizard's configuration mapping to POJOs via Jackson, but if
> > you
> > > wanted to stick with property maps I don't care enough to object.
> > >
> > > If the producer only dealt with bytes, is there a way we could still
> due
> > > partition plugins without specifying the number explicitly? I would
> > prefer
> > > to be able to pass in field(s) that would be used by the partitioner.
> > > Obviously if this wasn't possible you could always deserialize the
> object
> > > in the partitioner and grab the fields you want, but that seems really
> > > expensive to do on every message.
> > >
> > > It would also be nice to have a Java API Encoder constructor taking in
> > > VerifiableProperties. Scala understands how to handle "props:
> > > VerifiableProperties = null", but Java doesn't. So you don't run into
> > this
> > > problem until runtime.
> > >
> > >
> > > -Xavier
> > >
> > >
> > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman <cl...@breyman.com>
> > wrote:
> > >
> > > > Jay -
> > > >
> > > > Config - your explanation makes sense. I'm just so accustomed to
> having
> > > > Jackson automatically map my configuration objects to POJOs that I've
> > > > stopped using property files. They are lingua franca. The only
> thought
> > > > might be to separate the config interface from the implementation to
> > > allow
> > > > for alternatives, but that might undermine your point of "do it this
> > way
> > > so
> > > > that everyone can find it where they expect it".
> > > >
> > > > Serialization: Of the options, I like 1A the best, though possibly
> with
> > > > either an option to specify a partition key rather than ID or a
> helper
> > to
> > > > translate an arbitrary byte[] or long into a partition number.
> > > >
> > > > Thanks
> > > > Clark
> > > >
> > > >
> > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps <jay.kr...@gmail.com>
> > wrote:
> > > >
> > > > > Thanks for the detailed thoughts. Let me elaborate on the config
> > thing.
> > > > >
> > > > > I agree that at first glance key-value strings don't seem like a
> very
> > > > good
> > > > > configuration api for a client. Surely a well-typed config class
> > would
> > > be
> > > > > better! I actually disagree and let me see if I can convince you.
> > > > >
> > > > > My reasoning has nothing to do with the api and everything to do
> with
> > > > > operations.
> > > > >
> > > > > Clients are embedded in applications which are themselves
> configured.
> > > In
> > > > > any place that takes operations seriously the configuration for
> these
> > > > > applications will be version controlled and maintained through some
> > > kind
> > > > of
> > > > > config management system. If we give a config class with getters
> and
> > > > > setters the application has to expose those properties to its
> > > > > configuration. What invariably happens is that the application
> > exposes
> > > > only
> > > > > a choice few properties that they thought they would change.
> > > Furthermore
> > > > > the application will make up a name for these configs that seems
> > > > intuitive
> > > > > at the time in the 2 seconds the engineer spends thinking about it.
> > > > >
> > > > > Now consider the result of this in the large. You end up with
> dozens
> > or
> > > > > hundreds of applications that have the client embedded. Each
> exposes
> > a
> > > > > different, inadequate subset of the possible configs, each with
> > > different
> > > > > names. It is a nightmare.
> > > > >
> > > > > If you use a string-string map the config system can directly get a
> > > > bundle
> > > > > of config key-value pairs and put them into the client. This means
> > that
> > > > all
> > > > > configuration is automatically available with the name documented
> on
> > > the
> > > > > website in every application that does this. If you upgrade to a
> new
> > > > kafka
> > > > > version with more configs those will be exposed too. If you realize
> > > that
> > > > > you need to change a default you can just go through your configs
> and
> > > > > change it everywhere as it will have the same name everywhere.
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman <cl...@breyman.com>
> > > > wrote:
> > > > >
> > > > > > Thanks Jay. I'll see if I can put together a more complete
> > response,
> > > > > > perhaps as separate threads so that topics don't get entangled.
> In
> > > the
> > > > > mean
> > > > > > time, here's a couple responses:
> > > > > >
> > > > > > Serialization: you've broken out a sub-thread so i'll reply
> there.
> > My
> > > > > bias
> > > > > > is that I like generics (except for type-erasure) and in
> particular
> > > > they
> > > > > > make it easy to compose serializers for compound payloads (e.g.
> > when
> > > a
> > > > > > common header wraps a payload of parameterized type). I'll
> respond
> > to
> > > > > your
> > > > > > 4-options message with an example.
> > > > > >
> > > > > > Build: I've seen a lot of "maven-compatible" build systems
> produce
> > > > > > "artifacts" that aren't really artifacts - no embedded POM or,
> > worst,
> > > > > > malformed POM. I know the sbt-generated artifacts were this way -
> > > onus
> > > > is
> > > > > > on me to see what gradle is spitting out and what a maven build
> > might
> > > > > look
> > > > > > like. Maven may be old and boring, but it gets out of the way and
> > > > > > integrates really seamlessly with a lot of IDEs. When some scala
> > > > > projects I
> > > > > > was working on in the fall of 2011 switched from sbt to maven,
> > build
> > > > > became
> > > > > > a non-issue.
> > > > > >
> > > > > > Config: Not a big deal  and no, I don't think a dropwizard
> > dependency
> > > > is
> > > > > > appropriate. I do like using simple entity beans (POJO's not
> j2EE)
> > > for
> > > > > > configuration, especially if they can be marshalled without
> > > annotation
> > > > by
> > > > > > Jackson. I only mentioned the dropwizard-extras  because it has
> > some
> > > > > entity
> > > > > > bean versions of the ZK and Kafka configs.
> > > > > >
> > > > > > Domain-packaging: Also not a big deal - it's what's expected and
> > it's
> > > > > > pretty free in most IDE's. The advantages I see is that it is
> clear
> > > > > whether
> > > > > > something is from the Apache Kafka project and whether something
> is
> > > > from
> > > > > > another org and related to Kafka. That said, nothing really
> > enforces
> > > > it.
> > > > > >
> > > > > > Futures: I'll see if I can create some examples to demonstrate
> > Future
> > > > > > making interop easier.
> > > > > >
> > > > > > Regards,
> > > > > > C
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps <jay.kr...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hey Clark,
> > > > > > >
> > > > > > > - Serialization: Yes I agree with these though I don't consider
> > the
> > > > > loss
> > > > > > of
> > > > > > > generics a big issue. I'll try to summarize what I would
> consider
> > > the
> > > > > > best
> > > > > > > alternative api with raw byte[].
> > > > > > >
> > > > > > > - Maven: We had this debate a few months back and the consensus
> > was
> > > > > > gradle.
> > > > > > > Is there a specific issue with the poms gradle makes? I am
> > > extremely
> > > > > > loath
> > > > > > > to revisit the issue as build issues are a recurring thing and
> no
> > > one
> > > > > > ever
> > > > > > > agrees and ultimately our build needs are very simple.
> > > > > > >
> > > > > > > - Config: I'm not sure if I follow the point. Are you saying we
> > > > should
> > > > > > use
> > > > > > > something in dropwizard for config? One principle here is to
> try
> > to
> > > > > > remove
> > > > > > > as many client dependencies as possible as we inevitably run
> into
> > > > > > terrible
> > > > > > > compatibility issues with users who use the same library or its
> > > > > > > dependencies at different versions. Or are you talking about
> > > > > maintaining
> > > > > > > compatibility with existing config parameters? I think as much
> > as a
> > > > > > config
> > > > > > > in the existing client makes sense it should have the same name
> > (I
> > > > was
> > > > > a
> > > > > > > bit sloppy about that so I'll fix any errors there). There are
> a
> > > few
> > > > > new
> > > > > > > things and we should give those reasonable defaults. I think
> > config
> > > > is
> > > > > > > important so I'll start a thread on the config package in
> there.
> > > > > > >
> > > > > > > - org.apache.kafka: We could do this. I always considered it
> kind
> > > of
> > > > an
> > > > > > odd
> > > > > > > thing Java programmers do that has no real motivation (but I
> > could
> > > be
> > > > > > > re-educated!). I don't think it ends up reducing naming
> conflicts
> > > in
> > > > > > > practice and it adds a lot of noise and nested directories. Is
> > > there
> > > > a
> > > > > > > reason you prefer this or just to be more standard?
> > > > > > >
> > > > > > > - Future: Basically I didn't see any particular advantage. The
> > > > cancel()
> > > > > > > method doesn't really make sense so probably wouldn't work.
> > > Likewise
> > > > I
> > > > > > > dislike the checked exceptions it requires. Basically I just
> > wrote
> > > > out
> > > > > > some
> > > > > > > code examples and it seemed cleaner with a special purpose
> > object.
> > > I
> > > > > > wasn't
> > > > > > > actually aware of plans for improved futures in java 8 or the
> > other
> > > > > > > integrations. Maybe you could elaborate on this a bit and show
> > how
> > > it
> > > > > > would
> > > > > > > be used? Sounds promising, I just don't know a lot about it.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman <
> > cl...@breyman.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Jay - Thanks for the call for comments. Here's some initial
> > > input:
> > > > > > > >
> > > > > > > > - Make message serialization a client responsibility (making
> > all
> > > > > > messages
> > > > > > > > byte[]). Reflection-based loading makes it harder to use
> > generic
> > > > > codecs
> > > > > > > > (e.g.  Envelope<PREFIX, DATA, SUFFIX>) or build up codec
> > > > > > > programmatically.
> > > > > > > > Non-default partitioning should require an explicit partition
> > > key.
> > > > > > > >
> > > > > > > > - I really like the fact that it will be native Java. Please
> > > > consider
> > > > > > > using
> > > > > > > > native maven and not sbt, gradle, ivy, etc as they don't
> > reliably
> > > > > play
> > > > > > > nice
> > > > > > > > in the maven ecosystem. A jar without a well-formed pom
> doesn't
> > > > feel
> > > > > > > like a
> > > > > > > > real artifact. The pom's generated by sbt et al. are not well
> > > > formed.
> > > > > > > Using
> > > > > > > > maven will make builds and IDE integration much smoother.
> > > > > > > >
> > > > > > > > - Look at Nick Telford's dropwizard-extras package in which
> he
> > > > > defines
> > > > > > > some
> > > > > > > > Jackson-compatible POJO's for loading configuration. Seems
> like
> > > > your
> > > > > > > client
> > > > > > > > migration is similar. The config objects should have
> > constructors
> > > > or
> > > > > > > > factories that accept Map<String, String> and Properties for
> > ease
> > > > of
> > > > > > > > migration.
> > > > > > > >
> > > > > > > > - Would you consider using the org.apache.kafka package for
> the
> > > new
> > > > > API
> > > > > > > > (quibble)
> > > > > > > >
> > > > > > > > - Why create your own futures rather than use
> > > > > > > > java.util.concurrent.Future<Long> or similar? Standard
> futures
> > > will
> > > > > > play
> > > > > > > > nice with other reactive libs and things like J8's
> > > > ComposableFuture.
> > > > > > > >
> > > > > > > > Thanks again,
> > > > > > > > C
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover <
> > > > > roger.hoo...@gmail.com
> > > > > > > > >wrote:
> > > > > > > >
> > > > > > > > > A couple comments:
> > > > > > > > >
> > > > > > > > > 1) Why does the config use a broker list instead of
> > discovering
> > > > the
> > > > > > > > brokers
> > > > > > > > > in ZooKeeper?  It doesn't match the HighLevelConsumer API.
> > > > > > > > >
> > > > > > > > > 2) It looks like broker connections are created on demand.
> >  I'm
> > > > > > > wondering
> > > > > > > > > if sometimes you might want to flush out config or network
> > > > > > connectivity
> > > > > > > > > issues before pushing the first message through.
> > > > > > > > >
> > > > > > > > > Should there also be a KafkaProducer.connect() or .open()
> > > method
> > > > or
> > > > > > > > > connectAll()?  I guess it would try to connect to all
> brokers
> > > in
> > > > > the
> > > > > > > > > BROKER_LIST_CONFIG
> > > > > > > > >
> > > > > > > > > HTH,
> > > > > > > > >
> > > > > > > > > Roger
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps <
> > > jay.kr...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > As mentioned in a previous email we are working on a
> > > > > > > re-implementation
> > > > > > > > of
> > > > > > > > > > the producer. I would like to use this email thread to
> > > discuss
> > > > > the
> > > > > > > > > details
> > > > > > > > > > of the public API and the configuration. I would love for
> > us
> > > to
> > > > > be
> > > > > > > > > > incredibly picky about this public api now so it is as
> good
> > > as
> > > > > > > possible
> > > > > > > > > and
> > > > > > > > > > we don't need to break it in the future.
> > > > > > > > > >
> > > > > > > > > > The best way to get a feel for the API is actually to
> take
> > a
> > > > look
> > > > > > at
> > > > > > > > the
> > > > > > > > > > javadoc, my hope is to get the api docs good enough so
> that
> > > it
> > > > is
> > > > > > > > > > self-explanatory:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html
> > > > > > > > > >
> > > > > > > > > > Please take a look at this API and give me any thoughts
> you
> > > may
> > > > > > have!
> > > > > > > > > >
> > > > > > > > > > It may also be reasonable to take a look at the configs:
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html
> > > > > > > > > >
> > > > > > > > > > The actual code is posted here:
> > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1227
> > > > > > > > > >
> > > > > > > > > > A few questions or comments to kick things off:
> > > > > > > > > > 1. We need to make a decision on whether serialization of
> > the
> > > > > > user's
> > > > > > > > key
> > > > > > > > > > and value should be done by the user (with our api just
> > > taking
> > > > > > > byte[])
> > > > > > > > or
> > > > > > > > > > if we should take an object and allow the user to
> > configure a
> > > > > > > > Serializer
> > > > > > > > > > class which we instantiate via reflection. We take the
> > later
> > > > > > approach
> > > > > > > > in
> > > > > > > > > > the current producer, and I have carried this through to
> > this
> > > > > > > > prototype.
> > > > > > > > > > The tradeoff I see is this: taking byte[] is actually
> > > simpler,
> > > > > the
> > > > > > > user
> > > > > > > > > can
> > > > > > > > > > directly do whatever serialization they like. The
> > > complication
> > > > is
> > > > > > > > > actually
> > > > > > > > > > partitioning. Currently partitioning is done by a similar
> > > > plug-in
> > > > > > api
> > > > > > > > > > (Partitioner) which the user can implement and configure
> to
> > > > > > override
> > > > > > > > how
> > > > > > > > > > partitions are assigned. If we take byte[] as input then
> we
> > > > have
> > > > > no
> > > > > > > > > access
> > > > > > > > > > to the original object and partitioning MUST be done on
> the
> > > > > byte[].
> > > > > > > > This
> > > > > > > > > is
> > > > > > > > > > fine for hash partitioning. However for various types of
> > > > semantic
> > > > > > > > > > partitioning (range partitioning, or whatever) you would
> > want
> > > > > > access
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > > original object. In the current approach a producer who
> > > wishes
> > > > to
> > > > > > > send
> > > > > > > > > > byte[] they have serialized in their own code can
> configure
> > > the
> > > > > > > > > > BytesSerialization we supply which is just a "no op"
> > > > > serialization.
> > > > > > > > > > 2. We should obsess over naming and make sure each of the
> > > class
> > > > > > names
> > > > > > > > are
> > > > > > > > > > good.
> > > > > > > > > > 3. Jun has already pointed out that we need to include
> the
> > > > topic
> > > > > > and
> > > > > > > > > > partition in the response, which is absolutely right. I
> > > haven't
> > > > > > done
> > > > > > > > that
> > > > > > > > > > yet but that definitely needs to be there.
> > > > > > > > > > 4. Currently RecordSend.await will throw an exception if
> > the
> > > > > > request
> > > > > > > > > > failed. The intention here is that
> > > > producer.send(message).await()
> > > > > > > > exactly
> > > > > > > > > > simulates a synchronous call. Guozhang has noted that
> this
> > > is a
> > > > > > > little
> > > > > > > > > > annoying since the user must then catch exceptions.
> However
> > > if
> > > > we
> > > > > > > > remove
> > > > > > > > > > this then if the user doesn't check for errors they won't
> > > know
> > > > > one
> > > > > > > has
> > > > > > > > > > occurred, which I predict will be a common mistake.
> > > > > > > > > > 5. Perhaps there is more we could do to make the async
> > > > callbacks
> > > > > > and
> > > > > > > > > future
> > > > > > > > > > we give back intuitive and easy to program against?
> > > > > > > > > >
> > > > > > > > > > Some background info on implementation:
> > > > > > > > > >
> > > > > > > > > > At a high level the primary difference in this producer
> is
> > > that
> > > > > it
> > > > > > > > > removes
> > > > > > > > > > the distinction between the "sync" and "async" producer.
> > > > > > Effectively
> > > > > > > > all
> > > > > > > > > > requests are sent asynchronously but always return a
> future
> > > > > > response
> > > > > > > > > object
> > > > > > > > > > that gives the offset as well as any error that may have
> > > > occurred
> > > > > > > when
> > > > > > > > > the
> > > > > > > > > > request is complete. The batching that is done in the
> async
> > > > > > producer
> > > > > > > > only
> > > > > > > > > > today is done whenever possible now. This means that the
> > sync
> > > > > > > producer,
> > > > > > > > > > under load, can get performance as good as the async
> > producer
> > > > > > > > > (preliminary
> > > > > > > > > > results show the producer getting 1m messages/sec). This
> > > works
> > > > > > > similar
> > > > > > > > to
> > > > > > > > > > group commit in databases but with respect to the actual
> > > > network
> > > > > > > > > > transmission--any messages that arrive while a send is in
> > > > > progress
> > > > > > > are
> > > > > > > > > > batched together. It is also possible to encourage
> batching
> > > > even
> > > > > > > under
> > > > > > > > > low
> > > > > > > > > > load to save server resources by introducing a delay on
> the
> > > > send
> > > > > to
> > > > > > > > allow
> > > > > > > > > > more messages to accumulate; this is done using the
> > > > > linger.msconfig
> > > > > > > > > (this
> > > > > > > > > > is similar to Nagle's algorithm in TCP).
> > > > > > > > > >
> > > > > > > > > > This producer does all network communication
> asynchronously
> > > and
> > > > > in
> > > > > > > > > parallel
> > > > > > > > > > to all servers so the performance penalty for acks=-1 and
> > > > waiting
> > > > > > on
> > > > > > > > > > replication should be much reduced. I haven't done much
> > > > > > benchmarking
> > > > > > > on
> > > > > > > > > > this yet, though.
> > > > > > > > > >
> > > > > > > > > > The high level design is described a little here, though
> > this
> > > > is
> > > > > > now
> > > > > > > a
> > > > > > > > > > little out of date:
> > > > > > > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite
> > > > > > > > > >
> > > > > > > > > > -Jay
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to