Hey Tom, So is there one connection and I/O thread per broker and a low-level client for each of those, and then you hash into that to partition? Is it possible to batch across partitions or only within a partition?
-Jay On Wed, Jan 29, 2014 at 8:41 AM, Tom Brown <tombrow...@gmail.com> wrote: > Jay, > > There is both a basic client object, and a number of IO processing threads. > The client object manages connections, creating new ones when new machines > are connected, or when existing connections die. It also manages a queue of > requests for each server. The IO processing thread has a selector, and > performs the work of sending/receiving (removing items from the queue, > interpreting the response at a basic level, etc). Since asynchronous > sockets by nature decouple sending and receiving, request pipelining is > inherent. > > Using the basic client, you can send individual produce requests (singular > or batched). The "producer" layer adds an additional queue for each > partition, allowing individual messages to be batched together. > > --Tom > > > > On Tue, Jan 28, 2014 at 9:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > Hey Neha, > > > > Can you elaborate on why you prefer using Java's Future? The downside in > my > > mind is the use of the checked InterruptedException and > ExecutionException. > > ExecutionException is arguable, but forcing you to catch > > InterruptedException, often in code that can't be interrupted, seems > > perverse. It also leaves us with the cancel() method which I don't think > we > > really can implement. > > > > Option 1A, to recap/elaborate, was the following. There is no Serializer > or > > Partitioner api. We take a byte[] key and value and an optional integer > > partition. If you specify the integer partition it will be used. If you > do > > not specify a key or a partition the partition will be chosen in a round > > robin fashion. If you specify a key but no partition we will chose a > > partition based on a hash of the key. In order to let the user find the > > partition we will need to given them access to the Cluster instance > > directly from the producer. > > > > -Jay > > > > > > On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <neha.narkh...@gmail.com > > >wrote: > > > > > Here are more thoughts on the public APIs - > > > > > > - I suggest we use java's Future instead of custom Future especially > > since > > > it is part of the public API > > > > > > - Serialization: I like the simplicity of the producer APIs with the > > > absence of serialization where we just deal with byte arrays for keys > and > > > values. What I don't like about this is the performance overhead on the > > > Partitioner for any kind of custom partitioning based on the > > partitionKey. > > > Since the only purpose of partitionKey is to do custom partitioning, > why > > > can't we take it in directly as an integer and let the user figure out > > the > > > mapping from partition_key -> partition_id using the getCluster() API? > > If I > > > understand correctly, this is similar to what you suggested as part of > > > option 1A. I like this approach since it maintains the simplicity of > APIs > > > by allowing us to deal with bytes and does not compromise performance > in > > > the custom partitioning case. > > > > > > Thanks, > > > Neha > > > > > > > > > > > > On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps <jay.kr...@gmail.com> > wrote: > > > > > > > Hey Tom, > > > > > > > > That sounds cool. How did you end up handling parallel I/O if you > wrap > > > the > > > > individual connections? Don't you need some selector that selects > over > > > all > > > > the connections? > > > > > > > > -Jay > > > > > > > > > > > > On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown <tombrow...@gmail.com> > > wrote: > > > > > > > > > I implemented a 0.7 client in pure java, and its API very closely > > > > resembled > > > > > this. (When multiple people independently engineer the same > solution, > > > > it's > > > > > probably good... right?). However, there were a few architectural > > > > > differences with my client: > > > > > > > > > > 1. The basic client itself was just an asynchronous layer around > the > > > > > different server functions. In and of itself it had no knowledge of > > > > > partitions, only servers (and maintained TCP connections to them). > > > > > > > > > > 2. The main producer was an additional layer that provided a > > high-level > > > > > interface that could batch individual messages based on partition. > > > > > > > > > > 3. Knowledge of partitioning was done via an interface so that > > > different > > > > > strategies could be used. > > > > > > > > > > 4. Partitioning was done by the user, with knowledge of the > available > > > > > partitions provided by #3. > > > > > > > > > > 5. Serialization was done by the user to simplify the API. > > > > > > > > > > 6. Futures were used to make asynchronous emulate synchronous > calls. > > > > > > > > > > > > > > > The main benefit of this approach is flexibility. For example, > since > > > the > > > > > base client was just a managed connection (and not inherently a > > > > producer), > > > > > it was easy to composite a produce request and an offsets request > > > > together > > > > > into a confirmed produce request (officially not available in 0.7). > > > > > > > > > > Decoupling the basic client from partition management allowed the > me > > to > > > > > implement zk discovery as a separate project so that the main > project > > > had > > > > > no complex dependencies. The same was true of decoupling > > serialization. > > > > > It's trivial to build an optional layer that adds those features > in, > > > > while > > > > > allowing access to the base APIs for those that need it. > > > > > > > > > > Using standard Future objects was also beneficial, since I could > > > combine > > > > > them with existing tools (such as guava). > > > > > > > > > > It may be too late to be of use, but I have been working with my > > > > company's > > > > > legal department to release the implementation I described above. > If > > > > you're > > > > > interested in it, let me know. > > > > > > > > > > > > > > > To sum up my thoughts regarding the new API, I think it's a great > > > start. > > > > I > > > > > would like to see a more layered approach so I can use the parts I > > > want, > > > > > and adapt the other parts as needed. I would also like to see > > standard > > > > > interfaces (especially Future) used where they makes sense. > > > > > > > > > > --Tom > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 28, 2014 at 1:33 PM, Roger Hoover < > > roger.hoo...@gmail.com > > > > > >wrote: > > > > > > > > > > > +1 ListenableFuture: If this works similar to Deferreds in > Twisted > > > > Python > > > > > > or Promised IO in Javascript, I think this is a great pattern for > > > > > > decoupling your callback logic from the place where the Future is > > > > > > generated. You can register as many callbacks as you like, each > in > > > the > > > > > > appropriate layer of the code and have each observer get notified > > > when > > > > > the > > > > > > promised i/o is complete without any of them knowing about each > > > other. > > > > > > > > > > > > > > > > > > On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps <jay.kr...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Hey Ross, > > > > > > > > > > > > > > - ListenableFuture: Interesting. That would be an alternative > to > > > the > > > > > > direct > > > > > > > callback support we provide. There could be pros to this, let > me > > > > think > > > > > > > about it. > > > > > > > - We could provide layering, but I feel that the serialization > is > > > > such > > > > > a > > > > > > > small thing we should just make a decision and chose one, it > > > doesn't > > > > > seem > > > > > > > to me to justify a whole public facing layer. > > > > > > > - Yes, this is fairly esoteric, essentially I think it is > fairly > > > > > similar > > > > > > to > > > > > > > databases like DynamoDB that allow you to specify two partition > > > keys > > > > (I > > > > > > > think DynamoDB does this...). The reasoning is that in fact > there > > > are > > > > > > > several things you can use the key field for: (1) to compute > the > > > > > > partition > > > > > > > to store the data in, (2) as a unique identifier to deduplicate > > > that > > > > > > > partition's records within a log. These two things are almost > > > always > > > > > the > > > > > > > same, but occationally may differ when you want to group data > in > > a > > > > more > > > > > > > sophisticated way then just a hash of the primary key but still > > > > retain > > > > > > the > > > > > > > proper primary key for delivery to the consumer and log > > compaction. > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 28, 2014 at 3:24 AM, Ross Black < > > > ross.w.bl...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Jay, > > > > > > > > > > > > > > > > - Just to add some more info/confusion about possibly using > > > Future > > > > > ... > > > > > > > > If Kafka uses a JDK future, it plays nicely with other > > > frameworks > > > > > as > > > > > > > > well. > > > > > > > > Google Guava has a ListenableFuture that allows callback > > > handling > > > > > to > > > > > > be > > > > > > > > added via the returned future, and allows the callbacks to be > > > > passed > > > > > > off > > > > > > > to > > > > > > > > a specified executor. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html > > > > > > > > The JDK future can easily be converted to a listenable > > future. > > > > > > > > > > > > > > > > - On the question of byte[] vs Object, could this be solved > by > > > > > layering > > > > > > > the > > > > > > > > API? eg. a raw producer (use byte[] and specify the > partition > > > > > number) > > > > > > > and > > > > > > > > a normal producer (use generic object and specify a > > Partitioner)? > > > > > > > > > > > > > > > > - I am confused by the keys in ProducerRecord and > Partitioner. > > > > What > > > > > is > > > > > > > the > > > > > > > > usage for both a key and a partition key? (I am not yet using > > > 0.8) > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Ross > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On 28 January 2014 05:00, Xavier Stevens <xav...@gaikai.com> > > > > wrote: > > > > > > > > > > > > > > > > > AutoCloseable would be nice for us as most of our code is > > using > > > > > Java > > > > > > 7 > > > > > > > at > > > > > > > > > this point. > > > > > > > > > > > > > > > > > > I like Dropwizard's configuration mapping to POJOs via > > Jackson, > > > > but > > > > > > if > > > > > > > > you > > > > > > > > > wanted to stick with property maps I don't care enough to > > > object. > > > > > > > > > > > > > > > > > > If the producer only dealt with bytes, is there a way we > > could > > > > > still > > > > > > > due > > > > > > > > > partition plugins without specifying the number > explicitly? I > > > > would > > > > > > > > prefer > > > > > > > > > to be able to pass in field(s) that would be used by the > > > > > partitioner. > > > > > > > > > Obviously if this wasn't possible you could always > > deserialize > > > > the > > > > > > > object > > > > > > > > > in the partitioner and grab the fields you want, but that > > seems > > > > > > really > > > > > > > > > expensive to do on every message. > > > > > > > > > > > > > > > > > > It would also be nice to have a Java API Encoder > constructor > > > > taking > > > > > > in > > > > > > > > > VerifiableProperties. Scala understands how to handle > "props: > > > > > > > > > VerifiableProperties = null", but Java doesn't. So you > don't > > > run > > > > > into > > > > > > > > this > > > > > > > > > problem until runtime. > > > > > > > > > > > > > > > > > > > > > > > > > > > -Xavier > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman < > > > > cl...@breyman.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Jay - > > > > > > > > > > > > > > > > > > > > Config - your explanation makes sense. I'm just so > > accustomed > > > > to > > > > > > > having > > > > > > > > > > Jackson automatically map my configuration objects to > POJOs > > > > that > > > > > > I've > > > > > > > > > > stopped using property files. They are lingua franca. The > > > only > > > > > > > thought > > > > > > > > > > might be to separate the config interface from the > > > > implementation > > > > > > to > > > > > > > > > allow > > > > > > > > > > for alternatives, but that might undermine your point of > > "do > > > it > > > > > > this > > > > > > > > way > > > > > > > > > so > > > > > > > > > > that everyone can find it where they expect it". > > > > > > > > > > > > > > > > > > > > Serialization: Of the options, I like 1A the best, though > > > > > possibly > > > > > > > with > > > > > > > > > > either an option to specify a partition key rather than > ID > > > or a > > > > > > > helper > > > > > > > > to > > > > > > > > > > translate an arbitrary byte[] or long into a partition > > > number. > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > Clark > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps < > > > > jay.kr...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Thanks for the detailed thoughts. Let me elaborate on > the > > > > > config > > > > > > > > thing. > > > > > > > > > > > > > > > > > > > > > > I agree that at first glance key-value strings don't > seem > > > > like > > > > > a > > > > > > > very > > > > > > > > > > good > > > > > > > > > > > configuration api for a client. Surely a well-typed > > config > > > > > class > > > > > > > > would > > > > > > > > > be > > > > > > > > > > > better! I actually disagree and let me see if I can > > > convince > > > > > you. > > > > > > > > > > > > > > > > > > > > > > My reasoning has nothing to do with the api and > > everything > > > to > > > > > do > > > > > > > with > > > > > > > > > > > operations. > > > > > > > > > > > > > > > > > > > > > > Clients are embedded in applications which are > themselves > > > > > > > configured. > > > > > > > > > In > > > > > > > > > > > any place that takes operations seriously the > > configuration > > > > for > > > > > > > these > > > > > > > > > > > applications will be version controlled and maintained > > > > through > > > > > > some > > > > > > > > > kind > > > > > > > > > > of > > > > > > > > > > > config management system. If we give a config class > with > > > > > getters > > > > > > > and > > > > > > > > > > > setters the application has to expose those properties > to > > > its > > > > > > > > > > > configuration. What invariably happens is that the > > > > application > > > > > > > > exposes > > > > > > > > > > only > > > > > > > > > > > a choice few properties that they thought they would > > > change. > > > > > > > > > Furthermore > > > > > > > > > > > the application will make up a name for these configs > > that > > > > > seems > > > > > > > > > > intuitive > > > > > > > > > > > at the time in the 2 seconds the engineer spends > thinking > > > > about > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > Now consider the result of this in the large. You end > up > > > with > > > > > > > dozens > > > > > > > > or > > > > > > > > > > > hundreds of applications that have the client embedded. > > > Each > > > > > > > exposes > > > > > > > > a > > > > > > > > > > > different, inadequate subset of the possible configs, > > each > > > > with > > > > > > > > > different > > > > > > > > > > > names. It is a nightmare. > > > > > > > > > > > > > > > > > > > > > > If you use a string-string map the config system can > > > directly > > > > > > get a > > > > > > > > > > bundle > > > > > > > > > > > of config key-value pairs and put them into the client. > > > This > > > > > > means > > > > > > > > that > > > > > > > > > > all > > > > > > > > > > > configuration is automatically available with the name > > > > > documented > > > > > > > on > > > > > > > > > the > > > > > > > > > > > website in every application that does this. If you > > upgrade > > > > to > > > > > a > > > > > > > new > > > > > > > > > > kafka > > > > > > > > > > > version with more configs those will be exposed too. If > > you > > > > > > realize > > > > > > > > > that > > > > > > > > > > > you need to change a default you can just go through > your > > > > > configs > > > > > > > and > > > > > > > > > > > change it everywhere as it will have the same name > > > > everywhere. > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman < > > > > > > cl...@breyman.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Thanks Jay. I'll see if I can put together a more > > > complete > > > > > > > > response, > > > > > > > > > > > > perhaps as separate threads so that topics don't get > > > > > entangled. > > > > > > > In > > > > > > > > > the > > > > > > > > > > > mean > > > > > > > > > > > > time, here's a couple responses: > > > > > > > > > > > > > > > > > > > > > > > > Serialization: you've broken out a sub-thread so i'll > > > reply > > > > > > > there. > > > > > > > > My > > > > > > > > > > > bias > > > > > > > > > > > > is that I like generics (except for type-erasure) and > > in > > > > > > > particular > > > > > > > > > > they > > > > > > > > > > > > make it easy to compose serializers for compound > > payloads > > > > > (e.g. > > > > > > > > when > > > > > > > > > a > > > > > > > > > > > > common header wraps a payload of parameterized type). > > > I'll > > > > > > > respond > > > > > > > > to > > > > > > > > > > > your > > > > > > > > > > > > 4-options message with an example. > > > > > > > > > > > > > > > > > > > > > > > > Build: I've seen a lot of "maven-compatible" build > > > systems > > > > > > > produce > > > > > > > > > > > > "artifacts" that aren't really artifacts - no > embedded > > > POM > > > > > or, > > > > > > > > worst, > > > > > > > > > > > > malformed POM. I know the sbt-generated artifacts > were > > > this > > > > > > way - > > > > > > > > > onus > > > > > > > > > > is > > > > > > > > > > > > on me to see what gradle is spitting out and what a > > maven > > > > > build > > > > > > > > might > > > > > > > > > > > look > > > > > > > > > > > > like. Maven may be old and boring, but it gets out of > > the > > > > way > > > > > > and > > > > > > > > > > > > integrates really seamlessly with a lot of IDEs. When > > > some > > > > > > scala > > > > > > > > > > > projects I > > > > > > > > > > > > was working on in the fall of 2011 switched from sbt > to > > > > > maven, > > > > > > > > build > > > > > > > > > > > became > > > > > > > > > > > > a non-issue. > > > > > > > > > > > > > > > > > > > > > > > > Config: Not a big deal and no, I don't think a > > > dropwizard > > > > > > > > dependency > > > > > > > > > > is > > > > > > > > > > > > appropriate. I do like using simple entity beans > > (POJO's > > > > not > > > > > > > j2EE) > > > > > > > > > for > > > > > > > > > > > > configuration, especially if they can be marshalled > > > without > > > > > > > > > annotation > > > > > > > > > > by > > > > > > > > > > > > Jackson. I only mentioned the dropwizard-extras > > because > > > it > > > > > has > > > > > > > > some > > > > > > > > > > > entity > > > > > > > > > > > > bean versions of the ZK and Kafka configs. > > > > > > > > > > > > > > > > > > > > > > > > Domain-packaging: Also not a big deal - it's what's > > > > expected > > > > > > and > > > > > > > > it's > > > > > > > > > > > > pretty free in most IDE's. The advantages I see is > that > > > it > > > > is > > > > > > > clear > > > > > > > > > > > whether > > > > > > > > > > > > something is from the Apache Kafka project and > whether > > > > > > something > > > > > > > is > > > > > > > > > > from > > > > > > > > > > > > another org and related to Kafka. That said, nothing > > > really > > > > > > > > enforces > > > > > > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > Futures: I'll see if I can create some examples to > > > > > demonstrate > > > > > > > > Future > > > > > > > > > > > > making interop easier. > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > C > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps < > > > > > > jay.kr...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hey Clark, > > > > > > > > > > > > > > > > > > > > > > > > > > - Serialization: Yes I agree with these though I > > don't > > > > > > consider > > > > > > > > the > > > > > > > > > > > loss > > > > > > > > > > > > of > > > > > > > > > > > > > generics a big issue. I'll try to summarize what I > > > would > > > > > > > consider > > > > > > > > > the > > > > > > > > > > > > best > > > > > > > > > > > > > alternative api with raw byte[]. > > > > > > > > > > > > > > > > > > > > > > > > > > - Maven: We had this debate a few months back and > the > > > > > > consensus > > > > > > > > was > > > > > > > > > > > > gradle. > > > > > > > > > > > > > Is there a specific issue with the poms gradle > > makes? I > > > > am > > > > > > > > > extremely > > > > > > > > > > > > loath > > > > > > > > > > > > > to revisit the issue as build issues are a > recurring > > > > thing > > > > > > and > > > > > > > no > > > > > > > > > one > > > > > > > > > > > > ever > > > > > > > > > > > > > agrees and ultimately our build needs are very > > simple. > > > > > > > > > > > > > > > > > > > > > > > > > > - Config: I'm not sure if I follow the point. Are > you > > > > > saying > > > > > > we > > > > > > > > > > should > > > > > > > > > > > > use > > > > > > > > > > > > > something in dropwizard for config? One principle > > here > > > is > > > > > to > > > > > > > try > > > > > > > > to > > > > > > > > > > > > remove > > > > > > > > > > > > > as many client dependencies as possible as we > > > inevitably > > > > > run > > > > > > > into > > > > > > > > > > > > terrible > > > > > > > > > > > > > compatibility issues with users who use the same > > > library > > > > or > > > > > > its > > > > > > > > > > > > > dependencies at different versions. Or are you > > talking > > > > > about > > > > > > > > > > > maintaining > > > > > > > > > > > > > compatibility with existing config parameters? I > > think > > > as > > > > > > much > > > > > > > > as a > > > > > > > > > > > > config > > > > > > > > > > > > > in the existing client makes sense it should have > the > > > > same > > > > > > name > > > > > > > > (I > > > > > > > > > > was > > > > > > > > > > > a > > > > > > > > > > > > > bit sloppy about that so I'll fix any errors > there). > > > > There > > > > > > are > > > > > > > a > > > > > > > > > few > > > > > > > > > > > new > > > > > > > > > > > > > things and we should give those reasonable > defaults. > > I > > > > > think > > > > > > > > config > > > > > > > > > > is > > > > > > > > > > > > > important so I'll start a thread on the config > > package > > > in > > > > > > > there. > > > > > > > > > > > > > > > > > > > > > > > > > > - org.apache.kafka: We could do this. I always > > > considered > > > > > it > > > > > > > kind > > > > > > > > > of > > > > > > > > > > an > > > > > > > > > > > > odd > > > > > > > > > > > > > thing Java programmers do that has no real > motivation > > > > (but > > > > > I > > > > > > > > could > > > > > > > > > be > > > > > > > > > > > > > re-educated!). I don't think it ends up reducing > > naming > > > > > > > conflicts > > > > > > > > > in > > > > > > > > > > > > > practice and it adds a lot of noise and nested > > > > directories. > > > > > > Is > > > > > > > > > there > > > > > > > > > > a > > > > > > > > > > > > > reason you prefer this or just to be more standard? > > > > > > > > > > > > > > > > > > > > > > > > > > - Future: Basically I didn't see any particular > > > > advantage. > > > > > > The > > > > > > > > > > cancel() > > > > > > > > > > > > > method doesn't really make sense so probably > wouldn't > > > > work. > > > > > > > > > Likewise > > > > > > > > > > I > > > > > > > > > > > > > dislike the checked exceptions it requires. > > Basically I > > > > > just > > > > > > > > wrote > > > > > > > > > > out > > > > > > > > > > > > some > > > > > > > > > > > > > code examples and it seemed cleaner with a special > > > > purpose > > > > > > > > object. > > > > > > > > > I > > > > > > > > > > > > wasn't > > > > > > > > > > > > > actually aware of plans for improved futures in > java > > 8 > > > or > > > > > the > > > > > > > > other > > > > > > > > > > > > > integrations. Maybe you could elaborate on this a > bit > > > and > > > > > > show > > > > > > > > how > > > > > > > > > it > > > > > > > > > > > > would > > > > > > > > > > > > > be used? Sounds promising, I just don't know a lot > > > about > > > > > it. > > > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman < > > > > > > > > cl...@breyman.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Jay - Thanks for the call for comments. Here's > some > > > > > initial > > > > > > > > > input: > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Make message serialization a client > > responsibility > > > > > > (making > > > > > > > > all > > > > > > > > > > > > messages > > > > > > > > > > > > > > byte[]). Reflection-based loading makes it harder > > to > > > > use > > > > > > > > generic > > > > > > > > > > > codecs > > > > > > > > > > > > > > (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build > up > > > > codec > > > > > > > > > > > > > programmatically. > > > > > > > > > > > > > > Non-default partitioning should require an > explicit > > > > > > partition > > > > > > > > > key. > > > > > > > > > > > > > > > > > > > > > > > > > > > > - I really like the fact that it will be native > > Java. > > > > > > Please > > > > > > > > > > consider > > > > > > > > > > > > > using > > > > > > > > > > > > > > native maven and not sbt, gradle, ivy, etc as > they > > > > don't > > > > > > > > reliably > > > > > > > > > > > play > > > > > > > > > > > > > nice > > > > > > > > > > > > > > in the maven ecosystem. A jar without a > well-formed > > > pom > > > > > > > doesn't > > > > > > > > > > feel > > > > > > > > > > > > > like a > > > > > > > > > > > > > > real artifact. The pom's generated by sbt et al. > > are > > > > not > > > > > > well > > > > > > > > > > formed. > > > > > > > > > > > > > Using > > > > > > > > > > > > > > maven will make builds and IDE integration much > > > > smoother. > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Look at Nick Telford's dropwizard-extras > package > > in > > > > > which > > > > > > > he > > > > > > > > > > > defines > > > > > > > > > > > > > some > > > > > > > > > > > > > > Jackson-compatible POJO's for loading > > configuration. > > > > > Seems > > > > > > > like > > > > > > > > > > your > > > > > > > > > > > > > client > > > > > > > > > > > > > > migration is similar. The config objects should > > have > > > > > > > > constructors > > > > > > > > > > or > > > > > > > > > > > > > > factories that accept Map<String, String> and > > > > Properties > > > > > > for > > > > > > > > ease > > > > > > > > > > of > > > > > > > > > > > > > > migration. > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Would you consider using the org.apache.kafka > > > package > > > > > for > > > > > > > the > > > > > > > > > new > > > > > > > > > > > API > > > > > > > > > > > > > > (quibble) > > > > > > > > > > > > > > > > > > > > > > > > > > > > - Why create your own futures rather than use > > > > > > > > > > > > > > java.util.concurrent.Future<Long> or similar? > > > Standard > > > > > > > futures > > > > > > > > > will > > > > > > > > > > > > play > > > > > > > > > > > > > > nice with other reactive libs and things like > J8's > > > > > > > > > > ComposableFuture. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks again, > > > > > > > > > > > > > > C > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover < > > > > > > > > > > > roger.hoo...@gmail.com > > > > > > > > > > > > > > >wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A couple comments: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) Why does the config use a broker list > instead > > of > > > > > > > > discovering > > > > > > > > > > the > > > > > > > > > > > > > > brokers > > > > > > > > > > > > > > > in ZooKeeper? It doesn't match the > > > HighLevelConsumer > > > > > > API. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2) It looks like broker connections are created > > on > > > > > > demand. > > > > > > > > I'm > > > > > > > > > > > > > wondering > > > > > > > > > > > > > > > if sometimes you might want to flush out config > > or > > > > > > network > > > > > > > > > > > > connectivity > > > > > > > > > > > > > > > issues before pushing the first message > through. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Should there also be a KafkaProducer.connect() > or > > > > > .open() > > > > > > > > > method > > > > > > > > > > or > > > > > > > > > > > > > > > connectAll()? I guess it would try to connect > to > > > all > > > > > > > brokers > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > > > > BROKER_LIST_CONFIG > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > HTH, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Roger > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps < > > > > > > > > > jay.kr...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As mentioned in a previous email we are > working > > > on > > > > a > > > > > > > > > > > > > re-implementation > > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > the producer. I would like to use this email > > > thread > > > > > to > > > > > > > > > discuss > > > > > > > > > > > the > > > > > > > > > > > > > > > details > > > > > > > > > > > > > > > > of the public API and the configuration. I > > would > > > > love > > > > > > for > > > > > > > > us > > > > > > > > > to > > > > > > > > > > > be > > > > > > > > > > > > > > > > incredibly picky about this public api now so > > it > > > is > > > > > as > > > > > > > good > > > > > > > > > as > > > > > > > > > > > > > possible > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > we don't need to break it in the future. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The best way to get a feel for the API is > > > actually > > > > to > > > > > > > take > > > > > > > > a > > > > > > > > > > look > > > > > > > > > > > > at > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > javadoc, my hope is to get the api docs good > > > enough > > > > > so > > > > > > > that > > > > > > > > > it > > > > > > > > > > is > > > > > > > > > > > > > > > > self-explanatory: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look at this API and give me > any > > > > > thoughts > > > > > > > you > > > > > > > > > may > > > > > > > > > > > > have! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It may also be reasonable to take a look at > the > > > > > > configs: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The actual code is posted here: > > > > > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-1227 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A few questions or comments to kick things > off: > > > > > > > > > > > > > > > > 1. We need to make a decision on whether > > > > > serialization > > > > > > of > > > > > > > > the > > > > > > > > > > > > user's > > > > > > > > > > > > > > key > > > > > > > > > > > > > > > > and value should be done by the user (with > our > > > api > > > > > just > > > > > > > > > taking > > > > > > > > > > > > > byte[]) > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > if we should take an object and allow the > user > > to > > > > > > > > configure a > > > > > > > > > > > > > > Serializer > > > > > > > > > > > > > > > > class which we instantiate via reflection. We > > > take > > > > > the > > > > > > > > later > > > > > > > > > > > > approach > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > the current producer, and I have carried this > > > > through > > > > > > to > > > > > > > > this > > > > > > > > > > > > > > prototype. > > > > > > > > > > > > > > > > The tradeoff I see is this: taking byte[] is > > > > actually > > > > > > > > > simpler, > > > > > > > > > > > the > > > > > > > > > > > > > user > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > directly do whatever serialization they like. > > The > > > > > > > > > complication > > > > > > > > > > is > > > > > > > > > > > > > > > actually > > > > > > > > > > > > > > > > partitioning. Currently partitioning is done > > by a > > > > > > similar > > > > > > > > > > plug-in > > > > > > > > > > > > api > > > > > > > > > > > > > > > > (Partitioner) which the user can implement > and > > > > > > configure > > > > > > > to > > > > > > > > > > > > override > > > > > > > > > > > > > > how > > > > > > > > > > > > > > > > partitions are assigned. If we take byte[] as > > > input > > > > > > then > > > > > > > we > > > > > > > > > > have > > > > > > > > > > > no > > > > > > > > > > > > > > > access > > > > > > > > > > > > > > > > to the original object and partitioning MUST > be > > > > done > > > > > on > > > > > > > the > > > > > > > > > > > byte[]. > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > fine for hash partitioning. However for > various > > > > types > > > > > > of > > > > > > > > > > semantic > > > > > > > > > > > > > > > > partitioning (range partitioning, or > whatever) > > > you > > > > > > would > > > > > > > > want > > > > > > > > > > > > access > > > > > > > > > > > > > to > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > original object. In the current approach a > > > producer > > > > > who > > > > > > > > > wishes > > > > > > > > > > to > > > > > > > > > > > > > send > > > > > > > > > > > > > > > > byte[] they have serialized in their own code > > can > > > > > > > configure > > > > > > > > > the > > > > > > > > > > > > > > > > BytesSerialization we supply which is just a > > "no > > > > op" > > > > > > > > > > > serialization. > > > > > > > > > > > > > > > > 2. We should obsess over naming and make sure > > > each > > > > of > > > > > > the > > > > > > > > > class > > > > > > > > > > > > names > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > good. > > > > > > > > > > > > > > > > 3. Jun has already pointed out that we need > to > > > > > include > > > > > > > the > > > > > > > > > > topic > > > > > > > > > > > > and > > > > > > > > > > > > > > > > partition in the response, which is > absolutely > > > > > right. I > > > > > > > > > haven't > > > > > > > > > > > > done > > > > > > > > > > > > > > that > > > > > > > > > > > > > > > > yet but that definitely needs to be there. > > > > > > > > > > > > > > > > 4. Currently RecordSend.await will throw an > > > > exception > > > > > > if > > > > > > > > the > > > > > > > > > > > > request > > > > > > > > > > > > > > > > failed. The intention here is that > > > > > > > > > > producer.send(message).await() > > > > > > > > > > > > > > exactly > > > > > > > > > > > > > > > > simulates a synchronous call. Guozhang has > > noted > > > > that > > > > > > > this > > > > > > > > > is a > > > > > > > > > > > > > little > > > > > > > > > > > > > > > > annoying since the user must then catch > > > exceptions. > > > > > > > However > > > > > > > > > if > > > > > > > > > > we > > > > > > > > > > > > > > remove > > > > > > > > > > > > > > > > this then if the user doesn't check for > errors > > > they > > > > > > won't > > > > > > > > > know > > > > > > > > > > > one > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > occurred, which I predict will be a common > > > mistake. > > > > > > > > > > > > > > > > 5. Perhaps there is more we could do to make > > the > > > > > async > > > > > > > > > > callbacks > > > > > > > > > > > > and > > > > > > > > > > > > > > > future > > > > > > > > > > > > > > > > we give back intuitive and easy to program > > > against? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Some background info on implementation: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > At a high level the primary difference in > this > > > > > producer > > > > > > > is > > > > > > > > > that > > > > > > > > > > > it > > > > > > > > > > > > > > > removes > > > > > > > > > > > > > > > > the distinction between the "sync" and > "async" > > > > > > producer. > > > > > > > > > > > > Effectively > > > > > > > > > > > > > > all > > > > > > > > > > > > > > > > requests are sent asynchronously but always > > > return > > > > a > > > > > > > future > > > > > > > > > > > > response > > > > > > > > > > > > > > > object > > > > > > > > > > > > > > > > that gives the offset as well as any error > that > > > may > > > > > > have > > > > > > > > > > occurred > > > > > > > > > > > > > when > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > request is complete. The batching that is > done > > in > > > > the > > > > > > > async > > > > > > > > > > > > producer > > > > > > > > > > > > > > only > > > > > > > > > > > > > > > > today is done whenever possible now. This > means > > > > that > > > > > > the > > > > > > > > sync > > > > > > > > > > > > > producer, > > > > > > > > > > > > > > > > under load, can get performance as good as > the > > > > async > > > > > > > > producer > > > > > > > > > > > > > > > (preliminary > > > > > > > > > > > > > > > > results show the producer getting 1m > > > messages/sec). > > > > > > This > > > > > > > > > works > > > > > > > > > > > > > similar > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > group commit in databases but with respect to > > the > > > > > > actual > > > > > > > > > > network > > > > > > > > > > > > > > > > transmission--any messages that arrive while > a > > > send > > > > > is > > > > > > in > > > > > > > > > > > progress > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > batched together. It is also possible to > > > encourage > > > > > > > batching > > > > > > > > > > even > > > > > > > > > > > > > under > > > > > > > > > > > > > > > low > > > > > > > > > > > > > > > > load to save server resources by introducing > a > > > > delay > > > > > on > > > > > > > the > > > > > > > > > > send > > > > > > > > > > > to > > > > > > > > > > > > > > allow > > > > > > > > > > > > > > > > more messages to accumulate; this is done > using > > > the > > > > > > > > > > > linger.msconfig > > > > > > > > > > > > > > > (this > > > > > > > > > > > > > > > > is similar to Nagle's algorithm in TCP). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This producer does all network communication > > > > > > > asynchronously > > > > > > > > > and > > > > > > > > > > > in > > > > > > > > > > > > > > > parallel > > > > > > > > > > > > > > > > to all servers so the performance penalty for > > > > acks=-1 > > > > > > and > > > > > > > > > > waiting > > > > > > > > > > > > on > > > > > > > > > > > > > > > > replication should be much reduced. I haven't > > > done > > > > > much > > > > > > > > > > > > benchmarking > > > > > > > > > > > > > on > > > > > > > > > > > > > > > > this yet, though. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The high level design is described a little > > here, > > > > > > though > > > > > > > > this > > > > > > > > > > is > > > > > > > > > > > > now > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > little out of date: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >