Is the new producer API going to maintain protocol compatibility with old version if the API under the hood?
> On Jan 29, 2014, at 10:15, Jay Kreps <jay.kr...@gmail.com> wrote: > > The challenge of directly exposing ProduceRequestResult is that the offset > provided is just the base offset and there is no way to know for a > particular message where it was in relation to that base offset because the > batching is transparent and non-deterministic. So I think we do need some > kind of per-message result. > > I started with Future<RequestResult>, I think for the same reason you > prefer it but then when I actually looked at some code samples it wasn't > too great--checked exceptions, methods that we can't easily implement, etc. > I moved away from that for two reasons: > 1. When I actually wrote out some code samples of usage they were a little > ugly for the reasons I described--checked exceptions, methods we can't > implement, no helper methods, etc. > 2. I originally intended to make the result send work like a > ListenableFuture so that you would register the callback on the result > rather than as part of the call. I moved away from this primarily because > the implementation complexity was a little higher. > > Whether or not the code prettiness on its own outweighs the familiarity of > a normal Future I don't know, but that was the evolution of my thinking. > > -Jay > > >> On Wed, Jan 29, 2014 at 10:06 AM, Jay Kreps <jay.kr...@gmail.com> wrote: >> >> Hey Neha, >> >> Error handling in RecordSend works as in Future you will get the exception >> if there is one from any of the accessor methods or await(). >> >> The purpose of hasError was that you can write things slightly more simply >> (which some people expressed preference for): >> if(send.hasError()) >> // do something >> long offset = send.offset(); >> >> Instead of the more the slightly longer: >> try { >> long offset = send.offset(); >> } catch (KafkaException e) { >> // do something >> } >> >> >> On Wed, Jan 29, 2014 at 10:01 AM, Neha Narkhede >> <neha.narkh...@gmail.com>wrote: >> >>> Regarding the use of Futures - >>> >>> Agree that there are some downsides to using Futures but both approaches >>> have some tradeoffs. >>> >>> - Standardization and usability >>> Future is a widely used and understood Java API and given that the >>> functionality that RecordSend hopes to provide is essentially that of >>> Future, I think it makes sense to expose a widely understood public API >>> for >>> our clients. RecordSend, on the other hand, seems to provide some APIs >>> that >>> are very similar to that of Future, in addition to exposing a bunch of >>> APIs >>> that belong to ProduceRequestResult. As a user, I would've really >>> preferred >>> to deal with ProduceRequestResult directly - >>> Future<ProduceRequestResult> send(...) >>> >>> - Error handling >>> RecordSend's error handling is quite unintuitive where the user has to >>> remember to invoke hasError and error, instead of just throwing the >>> exception. Now there are >>> some downsides regarding error handling with the Future as well, where the >>> user has to catch InterruptedException when we would never run into it. >>> However, it seems like a price worth paying for supporting a standard API >>> and error handling >>> >>> - Unused APIs >>> This is a downside of using Future, where the cancel() operation would >>> always return false and mean nothing. But we can mention that caveat in >>> our >>> Java docs. >>> >>> To summarize, I would prefer to expose a well understood and widely >>> adopted >>> Java API and put up with the overhead of catching one unnecessary checked >>> exception, rather than wrap the useful ProduceRequestResult in a custom >>> async object (RecordSend) and explain that to our many users. >>> >>> Thanks, >>> Neha >>> >>> >>> >>> >>>> On Tue, Jan 28, 2014 at 8:10 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >>>> >>>> Hey Neha, >>>> >>>> Can you elaborate on why you prefer using Java's Future? The downside >>> in my >>>> mind is the use of the checked InterruptedException and >>> ExecutionException. >>>> ExecutionException is arguable, but forcing you to catch >>>> InterruptedException, often in code that can't be interrupted, seems >>>> perverse. It also leaves us with the cancel() method which I don't >>> think we >>>> really can implement. >>>> >>>> Option 1A, to recap/elaborate, was the following. There is no >>> Serializer or >>>> Partitioner api. We take a byte[] key and value and an optional integer >>>> partition. If you specify the integer partition it will be used. If you >>> do >>>> not specify a key or a partition the partition will be chosen in a round >>>> robin fashion. If you specify a key but no partition we will chose a >>>> partition based on a hash of the key. In order to let the user find the >>>> partition we will need to given them access to the Cluster instance >>>> directly from the producer. >>>> >>>> -Jay >>>> >>>> >>>> On Tue, Jan 28, 2014 at 6:25 PM, Neha Narkhede <neha.narkh...@gmail.com >>>>> wrote: >>>> >>>>> Here are more thoughts on the public APIs - >>>>> >>>>> - I suggest we use java's Future instead of custom Future especially >>>> since >>>>> it is part of the public API >>>>> >>>>> - Serialization: I like the simplicity of the producer APIs with the >>>>> absence of serialization where we just deal with byte arrays for keys >>> and >>>>> values. What I don't like about this is the performance overhead on >>> the >>>>> Partitioner for any kind of custom partitioning based on the >>>> partitionKey. >>>>> Since the only purpose of partitionKey is to do custom partitioning, >>> why >>>>> can't we take it in directly as an integer and let the user figure out >>>> the >>>>> mapping from partition_key -> partition_id using the getCluster() API? >>>> If I >>>>> understand correctly, this is similar to what you suggested as part of >>>>> option 1A. I like this approach since it maintains the simplicity of >>> APIs >>>>> by allowing us to deal with bytes and does not compromise performance >>> in >>>>> the custom partitioning case. >>>>> >>>>> Thanks, >>>>> Neha >>>>> >>>>> >>>>> >>>>> On Tue, Jan 28, 2014 at 5:42 PM, Jay Kreps <jay.kr...@gmail.com> >>> wrote: >>>>> >>>>>> Hey Tom, >>>>>> >>>>>> That sounds cool. How did you end up handling parallel I/O if you >>> wrap >>>>> the >>>>>> individual connections? Don't you need some selector that selects >>> over >>>>> all >>>>>> the connections? >>>>>> >>>>>> -Jay >>>>>> >>>>>> >>>>>> On Tue, Jan 28, 2014 at 2:31 PM, Tom Brown <tombrow...@gmail.com> >>>> wrote: >>>>>> >>>>>>> I implemented a 0.7 client in pure java, and its API very closely >>>>>> resembled >>>>>>> this. (When multiple people independently engineer the same >>> solution, >>>>>> it's >>>>>>> probably good... right?). However, there were a few architectural >>>>>>> differences with my client: >>>>>>> >>>>>>> 1. The basic client itself was just an asynchronous layer around >>> the >>>>>>> different server functions. In and of itself it had no knowledge >>> of >>>>>>> partitions, only servers (and maintained TCP connections to them). >>>>>>> >>>>>>> 2. The main producer was an additional layer that provided a >>>> high-level >>>>>>> interface that could batch individual messages based on partition. >>>>>>> >>>>>>> 3. Knowledge of partitioning was done via an interface so that >>>>> different >>>>>>> strategies could be used. >>>>>>> >>>>>>> 4. Partitioning was done by the user, with knowledge of the >>> available >>>>>>> partitions provided by #3. >>>>>>> >>>>>>> 5. Serialization was done by the user to simplify the API. >>>>>>> >>>>>>> 6. Futures were used to make asynchronous emulate synchronous >>> calls. >>>>>>> >>>>>>> >>>>>>> The main benefit of this approach is flexibility. For example, >>> since >>>>> the >>>>>>> base client was just a managed connection (and not inherently a >>>>>> producer), >>>>>>> it was easy to composite a produce request and an offsets request >>>>>> together >>>>>>> into a confirmed produce request (officially not available in >>> 0.7). >>>>>>> >>>>>>> Decoupling the basic client from partition management allowed the >>> me >>>> to >>>>>>> implement zk discovery as a separate project so that the main >>> project >>>>> had >>>>>>> no complex dependencies. The same was true of decoupling >>>> serialization. >>>>>>> It's trivial to build an optional layer that adds those features >>> in, >>>>>> while >>>>>>> allowing access to the base APIs for those that need it. >>>>>>> >>>>>>> Using standard Future objects was also beneficial, since I could >>>>> combine >>>>>>> them with existing tools (such as guava). >>>>>>> >>>>>>> It may be too late to be of use, but I have been working with my >>>>>> company's >>>>>>> legal department to release the implementation I described above. >>> If >>>>>> you're >>>>>>> interested in it, let me know. >>>>>>> >>>>>>> >>>>>>> To sum up my thoughts regarding the new API, I think it's a great >>>>> start. >>>>>> I >>>>>>> would like to see a more layered approach so I can use the parts I >>>>> want, >>>>>>> and adapt the other parts as needed. I would also like to see >>>> standard >>>>>>> interfaces (especially Future) used where they makes sense. >>>>>>> >>>>>>> --Tom >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Jan 28, 2014 at 1:33 PM, Roger Hoover < >>>> roger.hoo...@gmail.com >>>>>>>> wrote: >>>>>>> >>>>>>>> +1 ListenableFuture: If this works similar to Deferreds in >>> Twisted >>>>>> Python >>>>>>>> or Promised IO in Javascript, I think this is a great pattern >>> for >>>>>>>> decoupling your callback logic from the place where the Future >>> is >>>>>>>> generated. You can register as many callbacks as you like, >>> each in >>>>> the >>>>>>>> appropriate layer of the code and have each observer get >>> notified >>>>> when >>>>>>> the >>>>>>>> promised i/o is complete without any of them knowing about each >>>>> other. >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Jan 28, 2014 at 11:32 AM, Jay Kreps < >>> jay.kr...@gmail.com> >>>>>> wrote: >>>>>>>> >>>>>>>>> Hey Ross, >>>>>>>>> >>>>>>>>> - ListenableFuture: Interesting. That would be an alternative >>> to >>>>> the >>>>>>>> direct >>>>>>>>> callback support we provide. There could be pros to this, let >>> me >>>>>> think >>>>>>>>> about it. >>>>>>>>> - We could provide layering, but I feel that the >>> serialization is >>>>>> such >>>>>>> a >>>>>>>>> small thing we should just make a decision and chose one, it >>>>> doesn't >>>>>>> seem >>>>>>>>> to me to justify a whole public facing layer. >>>>>>>>> - Yes, this is fairly esoteric, essentially I think it is >>> fairly >>>>>>> similar >>>>>>>> to >>>>>>>>> databases like DynamoDB that allow you to specify two >>> partition >>>>> keys >>>>>> (I >>>>>>>>> think DynamoDB does this...). The reasoning is that in fact >>> there >>>>> are >>>>>>>>> several things you can use the key field for: (1) to compute >>> the >>>>>>>> partition >>>>>>>>> to store the data in, (2) as a unique identifier to >>> deduplicate >>>>> that >>>>>>>>> partition's records within a log. These two things are almost >>>>> always >>>>>>> the >>>>>>>>> same, but occationally may differ when you want to group data >>> in >>>> a >>>>>> more >>>>>>>>> sophisticated way then just a hash of the primary key but >>> still >>>>>> retain >>>>>>>> the >>>>>>>>> proper primary key for delivery to the consumer and log >>>> compaction. >>>>>>>>> >>>>>>>>> -Jay >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jan 28, 2014 at 3:24 AM, Ross Black < >>>>> ross.w.bl...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Jay, >>>>>>>>>> >>>>>>>>>> - Just to add some more info/confusion about possibly using >>>>> Future >>>>>>> ... >>>>>>>>>> If Kafka uses a JDK future, it plays nicely with other >>>>> frameworks >>>>>>> as >>>>>>>>>> well. >>>>>>>>>> Google Guava has a ListenableFuture that allows callback >>>>> handling >>>>>>> to >>>>>>>> be >>>>>>>>>> added via the returned future, and allows the callbacks to >>> be >>>>>> passed >>>>>>>> off >>>>>>>>> to >>>>>>>>>> a specified executor. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> http://docs.guava-libraries.googlecode.com/git-history/release/javadoc/com/google/common/util/concurrent/ListenableFuture.html >>>>>>>>>> The JDK future can easily be converted to a listenable >>>> future. >>>>>>>>>> >>>>>>>>>> - On the question of byte[] vs Object, could this be solved >>> by >>>>>>> layering >>>>>>>>> the >>>>>>>>>> API? eg. a raw producer (use byte[] and specify the >>> partition >>>>>>> number) >>>>>>>>> and >>>>>>>>>> a normal producer (use generic object and specify a >>>> Partitioner)? >>>>>>>>>> >>>>>>>>>> - I am confused by the keys in ProducerRecord and >>> Partitioner. >>>>>> What >>>>>>> is >>>>>>>>> the >>>>>>>>>> usage for both a key and a partition key? (I am not yet >>> using >>>>> 0.8) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Ross >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 28 January 2014 05:00, Xavier Stevens <xav...@gaikai.com >>>> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> AutoCloseable would be nice for us as most of our code is >>>> using >>>>>>> Java >>>>>>>> 7 >>>>>>>>> at >>>>>>>>>>> this point. >>>>>>>>>>> >>>>>>>>>>> I like Dropwizard's configuration mapping to POJOs via >>>> Jackson, >>>>>> but >>>>>>>> if >>>>>>>>>> you >>>>>>>>>>> wanted to stick with property maps I don't care enough to >>>>> object. >>>>>>>>>>> >>>>>>>>>>> If the producer only dealt with bytes, is there a way we >>>> could >>>>>>> still >>>>>>>>> due >>>>>>>>>>> partition plugins without specifying the number >>> explicitly? I >>>>>> would >>>>>>>>>> prefer >>>>>>>>>>> to be able to pass in field(s) that would be used by the >>>>>>> partitioner. >>>>>>>>>>> Obviously if this wasn't possible you could always >>>> deserialize >>>>>> the >>>>>>>>> object >>>>>>>>>>> in the partitioner and grab the fields you want, but that >>>> seems >>>>>>>> really >>>>>>>>>>> expensive to do on every message. >>>>>>>>>>> >>>>>>>>>>> It would also be nice to have a Java API Encoder >>> constructor >>>>>> taking >>>>>>>> in >>>>>>>>>>> VerifiableProperties. Scala understands how to handle >>> "props: >>>>>>>>>>> VerifiableProperties = null", but Java doesn't. So you >>> don't >>>>> run >>>>>>> into >>>>>>>>>> this >>>>>>>>>>> problem until runtime. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Xavier >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Jan 27, 2014 at 9:37 AM, Clark Breyman < >>>>>> cl...@breyman.com> >>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Jay - >>>>>>>>>>>> >>>>>>>>>>>> Config - your explanation makes sense. I'm just so >>>> accustomed >>>>>> to >>>>>>>>> having >>>>>>>>>>>> Jackson automatically map my configuration objects to >>> POJOs >>>>>> that >>>>>>>> I've >>>>>>>>>>>> stopped using property files. They are lingua franca. >>> The >>>>> only >>>>>>>>> thought >>>>>>>>>>>> might be to separate the config interface from the >>>>>> implementation >>>>>>>> to >>>>>>>>>>> allow >>>>>>>>>>>> for alternatives, but that might undermine your point of >>>> "do >>>>> it >>>>>>>> this >>>>>>>>>> way >>>>>>>>>>> so >>>>>>>>>>>> that everyone can find it where they expect it". >>>>>>>>>>>> >>>>>>>>>>>> Serialization: Of the options, I like 1A the best, >>> though >>>>>>> possibly >>>>>>>>> with >>>>>>>>>>>> either an option to specify a partition key rather than >>> ID >>>>> or a >>>>>>>>> helper >>>>>>>>>> to >>>>>>>>>>>> translate an arbitrary byte[] or long into a partition >>>>> number. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Clark >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 26, 2014 at 9:13 PM, Jay Kreps < >>>>>> jay.kr...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the detailed thoughts. Let me elaborate on >>> the >>>>>>> config >>>>>>>>>> thing. >>>>>>>>>>>>> >>>>>>>>>>>>> I agree that at first glance key-value strings don't >>> seem >>>>>> like >>>>>>> a >>>>>>>>> very >>>>>>>>>>>> good >>>>>>>>>>>>> configuration api for a client. Surely a well-typed >>>> config >>>>>>> class >>>>>>>>>> would >>>>>>>>>>> be >>>>>>>>>>>>> better! I actually disagree and let me see if I can >>>>> convince >>>>>>> you. >>>>>>>>>>>>> >>>>>>>>>>>>> My reasoning has nothing to do with the api and >>>> everything >>>>> to >>>>>>> do >>>>>>>>> with >>>>>>>>>>>>> operations. >>>>>>>>>>>>> >>>>>>>>>>>>> Clients are embedded in applications which are >>> themselves >>>>>>>>> configured. >>>>>>>>>>> In >>>>>>>>>>>>> any place that takes operations seriously the >>>> configuration >>>>>> for >>>>>>>>> these >>>>>>>>>>>>> applications will be version controlled and maintained >>>>>> through >>>>>>>> some >>>>>>>>>>> kind >>>>>>>>>>>> of >>>>>>>>>>>>> config management system. If we give a config class >>> with >>>>>>> getters >>>>>>>>> and >>>>>>>>>>>>> setters the application has to expose those >>> properties to >>>>> its >>>>>>>>>>>>> configuration. What invariably happens is that the >>>>>> application >>>>>>>>>> exposes >>>>>>>>>>>> only >>>>>>>>>>>>> a choice few properties that they thought they would >>>>> change. >>>>>>>>>>> Furthermore >>>>>>>>>>>>> the application will make up a name for these configs >>>> that >>>>>>> seems >>>>>>>>>>>> intuitive >>>>>>>>>>>>> at the time in the 2 seconds the engineer spends >>> thinking >>>>>> about >>>>>>>> it. >>>>>>>>>>>>> >>>>>>>>>>>>> Now consider the result of this in the large. You end >>> up >>>>> with >>>>>>>>> dozens >>>>>>>>>> or >>>>>>>>>>>>> hundreds of applications that have the client >>> embedded. >>>>> Each >>>>>>>>> exposes >>>>>>>>>> a >>>>>>>>>>>>> different, inadequate subset of the possible configs, >>>> each >>>>>> with >>>>>>>>>>> different >>>>>>>>>>>>> names. It is a nightmare. >>>>>>>>>>>>> >>>>>>>>>>>>> If you use a string-string map the config system can >>>>> directly >>>>>>>> get a >>>>>>>>>>>> bundle >>>>>>>>>>>>> of config key-value pairs and put them into the >>> client. >>>>> This >>>>>>>> means >>>>>>>>>> that >>>>>>>>>>>> all >>>>>>>>>>>>> configuration is automatically available with the name >>>>>>> documented >>>>>>>>> on >>>>>>>>>>> the >>>>>>>>>>>>> website in every application that does this. If you >>>> upgrade >>>>>> to >>>>>>> a >>>>>>>>> new >>>>>>>>>>>> kafka >>>>>>>>>>>>> version with more configs those will be exposed too. >>> If >>>> you >>>>>>>> realize >>>>>>>>>>> that >>>>>>>>>>>>> you need to change a default you can just go through >>> your >>>>>>> configs >>>>>>>>> and >>>>>>>>>>>>> change it everywhere as it will have the same name >>>>>> everywhere. >>>>>>>>>>>>> >>>>>>>>>>>>> -Jay >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Jan 26, 2014 at 4:47 PM, Clark Breyman < >>>>>>>> cl...@breyman.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Jay. I'll see if I can put together a more >>>>> complete >>>>>>>>>> response, >>>>>>>>>>>>>> perhaps as separate threads so that topics don't get >>>>>>> entangled. >>>>>>>>> In >>>>>>>>>>> the >>>>>>>>>>>>> mean >>>>>>>>>>>>>> time, here's a couple responses: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Serialization: you've broken out a sub-thread so >>> i'll >>>>> reply >>>>>>>>> there. >>>>>>>>>> My >>>>>>>>>>>>> bias >>>>>>>>>>>>>> is that I like generics (except for type-erasure) >>> and >>>> in >>>>>>>>> particular >>>>>>>>>>>> they >>>>>>>>>>>>>> make it easy to compose serializers for compound >>>> payloads >>>>>>> (e.g. >>>>>>>>>> when >>>>>>>>>>> a >>>>>>>>>>>>>> common header wraps a payload of parameterized >>> type). >>>>> I'll >>>>>>>>> respond >>>>>>>>>> to >>>>>>>>>>>>> your >>>>>>>>>>>>>> 4-options message with an example. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Build: I've seen a lot of "maven-compatible" build >>>>> systems >>>>>>>>> produce >>>>>>>>>>>>>> "artifacts" that aren't really artifacts - no >>> embedded >>>>> POM >>>>>>> or, >>>>>>>>>> worst, >>>>>>>>>>>>>> malformed POM. I know the sbt-generated artifacts >>> were >>>>> this >>>>>>>> way - >>>>>>>>>>> onus >>>>>>>>>>>> is >>>>>>>>>>>>>> on me to see what gradle is spitting out and what a >>>> maven >>>>>>> build >>>>>>>>>> might >>>>>>>>>>>>> look >>>>>>>>>>>>>> like. Maven may be old and boring, but it gets out >>> of >>>> the >>>>>> way >>>>>>>> and >>>>>>>>>>>>>> integrates really seamlessly with a lot of IDEs. >>> When >>>>> some >>>>>>>> scala >>>>>>>>>>>>> projects I >>>>>>>>>>>>>> was working on in the fall of 2011 switched from >>> sbt to >>>>>>> maven, >>>>>>>>>> build >>>>>>>>>>>>> became >>>>>>>>>>>>>> a non-issue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Config: Not a big deal and no, I don't think a >>>>> dropwizard >>>>>>>>>> dependency >>>>>>>>>>>> is >>>>>>>>>>>>>> appropriate. I do like using simple entity beans >>>> (POJO's >>>>>> not >>>>>>>>> j2EE) >>>>>>>>>>> for >>>>>>>>>>>>>> configuration, especially if they can be marshalled >>>>> without >>>>>>>>>>> annotation >>>>>>>>>>>> by >>>>>>>>>>>>>> Jackson. I only mentioned the dropwizard-extras >>>> because >>>>> it >>>>>>> has >>>>>>>>>> some >>>>>>>>>>>>> entity >>>>>>>>>>>>>> bean versions of the ZK and Kafka configs. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Domain-packaging: Also not a big deal - it's what's >>>>>> expected >>>>>>>> and >>>>>>>>>> it's >>>>>>>>>>>>>> pretty free in most IDE's. The advantages I see is >>> that >>>>> it >>>>>> is >>>>>>>>> clear >>>>>>>>>>>>> whether >>>>>>>>>>>>>> something is from the Apache Kafka project and >>> whether >>>>>>>> something >>>>>>>>> is >>>>>>>>>>>> from >>>>>>>>>>>>>> another org and related to Kafka. That said, nothing >>>>> really >>>>>>>>>> enforces >>>>>>>>>>>> it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Futures: I'll see if I can create some examples to >>>>>>> demonstrate >>>>>>>>>> Future >>>>>>>>>>>>>> making interop easier. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> C >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Jan 24, 2014 at 4:36 PM, Jay Kreps < >>>>>>>> jay.kr...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hey Clark, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Serialization: Yes I agree with these though I >>>> don't >>>>>>>> consider >>>>>>>>>> the >>>>>>>>>>>>> loss >>>>>>>>>>>>>> of >>>>>>>>>>>>>>> generics a big issue. I'll try to summarize what I >>>>> would >>>>>>>>> consider >>>>>>>>>>> the >>>>>>>>>>>>>> best >>>>>>>>>>>>>>> alternative api with raw byte[]. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Maven: We had this debate a few months back and >>> the >>>>>>>> consensus >>>>>>>>>> was >>>>>>>>>>>>>> gradle. >>>>>>>>>>>>>>> Is there a specific issue with the poms gradle >>>> makes? I >>>>>> am >>>>>>>>>>> extremely >>>>>>>>>>>>>> loath >>>>>>>>>>>>>>> to revisit the issue as build issues are a >>> recurring >>>>>> thing >>>>>>>> and >>>>>>>>> no >>>>>>>>>>> one >>>>>>>>>>>>>> ever >>>>>>>>>>>>>>> agrees and ultimately our build needs are very >>>> simple. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Config: I'm not sure if I follow the point. Are >>> you >>>>>>> saying >>>>>>>> we >>>>>>>>>>>> should >>>>>>>>>>>>>> use >>>>>>>>>>>>>>> something in dropwizard for config? One principle >>>> here >>>>> is >>>>>>> to >>>>>>>>> try >>>>>>>>>> to >>>>>>>>>>>>>> remove >>>>>>>>>>>>>>> as many client dependencies as possible as we >>>>> inevitably >>>>>>> run >>>>>>>>> into >>>>>>>>>>>>>> terrible >>>>>>>>>>>>>>> compatibility issues with users who use the same >>>>> library >>>>>> or >>>>>>>> its >>>>>>>>>>>>>>> dependencies at different versions. Or are you >>>> talking >>>>>>> about >>>>>>>>>>>>> maintaining >>>>>>>>>>>>>>> compatibility with existing config parameters? I >>>> think >>>>> as >>>>>>>> much >>>>>>>>>> as a >>>>>>>>>>>>>> config >>>>>>>>>>>>>>> in the existing client makes sense it should have >>> the >>>>>> same >>>>>>>> name >>>>>>>>>> (I >>>>>>>>>>>> was >>>>>>>>>>>>> a >>>>>>>>>>>>>>> bit sloppy about that so I'll fix any errors >>> there). >>>>>> There >>>>>>>> are >>>>>>>>> a >>>>>>>>>>> few >>>>>>>>>>>>> new >>>>>>>>>>>>>>> things and we should give those reasonable >>> defaults. >>>> I >>>>>>> think >>>>>>>>>> config >>>>>>>>>>>> is >>>>>>>>>>>>>>> important so I'll start a thread on the config >>>> package >>>>> in >>>>>>>>> there. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - org.apache.kafka: We could do this. I always >>>>> considered >>>>>>> it >>>>>>>>> kind >>>>>>>>>>> of >>>>>>>>>>>> an >>>>>>>>>>>>>> odd >>>>>>>>>>>>>>> thing Java programmers do that has no real >>> motivation >>>>>> (but >>>>>>> I >>>>>>>>>> could >>>>>>>>>>> be >>>>>>>>>>>>>>> re-educated!). I don't think it ends up reducing >>>> naming >>>>>>>>> conflicts >>>>>>>>>>> in >>>>>>>>>>>>>>> practice and it adds a lot of noise and nested >>>>>> directories. >>>>>>>> Is >>>>>>>>>>> there >>>>>>>>>>>> a >>>>>>>>>>>>>>> reason you prefer this or just to be more >>> standard? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> - Future: Basically I didn't see any particular >>>>>> advantage. >>>>>>>> The >>>>>>>>>>>> cancel() >>>>>>>>>>>>>>> method doesn't really make sense so probably >>> wouldn't >>>>>> work. >>>>>>>>>>> Likewise >>>>>>>>>>>> I >>>>>>>>>>>>>>> dislike the checked exceptions it requires. >>>> Basically I >>>>>>> just >>>>>>>>>> wrote >>>>>>>>>>>> out >>>>>>>>>>>>>> some >>>>>>>>>>>>>>> code examples and it seemed cleaner with a special >>>>>> purpose >>>>>>>>>> object. >>>>>>>>>>> I >>>>>>>>>>>>>> wasn't >>>>>>>>>>>>>>> actually aware of plans for improved futures in >>> java >>>> 8 >>>>> or >>>>>>> the >>>>>>>>>> other >>>>>>>>>>>>>>> integrations. Maybe you could elaborate on this a >>> bit >>>>> and >>>>>>>> show >>>>>>>>>> how >>>>>>>>>>> it >>>>>>>>>>>>>> would >>>>>>>>>>>>>>> be used? Sounds promising, I just don't know a lot >>>>> about >>>>>>> it. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Jan 24, 2014 at 3:30 PM, Clark Breyman < >>>>>>>>>> cl...@breyman.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Jay - Thanks for the call for comments. Here's >>> some >>>>>>> initial >>>>>>>>>>> input: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Make message serialization a client >>>> responsibility >>>>>>>> (making >>>>>>>>>> all >>>>>>>>>>>>>> messages >>>>>>>>>>>>>>>> byte[]). Reflection-based loading makes it >>> harder >>>> to >>>>>> use >>>>>>>>>> generic >>>>>>>>>>>>> codecs >>>>>>>>>>>>>>>> (e.g. Envelope<PREFIX, DATA, SUFFIX>) or build >>> up >>>>>> codec >>>>>>>>>>>>>>> programmatically. >>>>>>>>>>>>>>>> Non-default partitioning should require an >>> explicit >>>>>>>> partition >>>>>>>>>>> key. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - I really like the fact that it will be native >>>> Java. >>>>>>>> Please >>>>>>>>>>>> consider >>>>>>>>>>>>>>> using >>>>>>>>>>>>>>>> native maven and not sbt, gradle, ivy, etc as >>> they >>>>>> don't >>>>>>>>>> reliably >>>>>>>>>>>>> play >>>>>>>>>>>>>>> nice >>>>>>>>>>>>>>>> in the maven ecosystem. A jar without a >>> well-formed >>>>> pom >>>>>>>>> doesn't >>>>>>>>>>>> feel >>>>>>>>>>>>>>> like a >>>>>>>>>>>>>>>> real artifact. The pom's generated by sbt et al. >>>> are >>>>>> not >>>>>>>> well >>>>>>>>>>>> formed. >>>>>>>>>>>>>>> Using >>>>>>>>>>>>>>>> maven will make builds and IDE integration much >>>>>> smoother. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Look at Nick Telford's dropwizard-extras >>> package >>>> in >>>>>>> which >>>>>>>>> he >>>>>>>>>>>>> defines >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>> Jackson-compatible POJO's for loading >>>> configuration. >>>>>>> Seems >>>>>>>>> like >>>>>>>>>>>> your >>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>> migration is similar. The config objects should >>>> have >>>>>>>>>> constructors >>>>>>>>>>>> or >>>>>>>>>>>>>>>> factories that accept Map<String, String> and >>>>>> Properties >>>>>>>> for >>>>>>>>>> ease >>>>>>>>>>>> of >>>>>>>>>>>>>>>> migration. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Would you consider using the org.apache.kafka >>>>> package >>>>>>> for >>>>>>>>> the >>>>>>>>>>> new >>>>>>>>>>>>> API >>>>>>>>>>>>>>>> (quibble) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> - Why create your own futures rather than use >>>>>>>>>>>>>>>> java.util.concurrent.Future<Long> or similar? >>>>> Standard >>>>>>>>> futures >>>>>>>>>>> will >>>>>>>>>>>>>> play >>>>>>>>>>>>>>>> nice with other reactive libs and things like >>> J8's >>>>>>>>>>>> ComposableFuture. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks again, >>>>>>>>>>>>>>>> C >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Jan 24, 2014 at 2:46 PM, Roger Hoover < >>>>>>>>>>>>> roger.hoo...@gmail.com >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> A couple comments: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1) Why does the config use a broker list >>> instead >>>> of >>>>>>>>>> discovering >>>>>>>>>>>> the >>>>>>>>>>>>>>>> brokers >>>>>>>>>>>>>>>>> in ZooKeeper? It doesn't match the >>>>> HighLevelConsumer >>>>>>>> API. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2) It looks like broker connections are >>> created >>>> on >>>>>>>> demand. >>>>>>>>>> I'm >>>>>>>>>>>>>>> wondering >>>>>>>>>>>>>>>>> if sometimes you might want to flush out >>> config >>>> or >>>>>>>> network >>>>>>>>>>>>>> connectivity >>>>>>>>>>>>>>>>> issues before pushing the first message >>> through. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Should there also be a >>> KafkaProducer.connect() or >>>>>>> .open() >>>>>>>>>>> method >>>>>>>>>>>> or >>>>>>>>>>>>>>>>> connectAll()? I guess it would try to >>> connect to >>>>> all >>>>>>>>> brokers >>>>>>>>>>> in >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> BROKER_LIST_CONFIG >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> HTH, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Roger >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Jan 24, 2014 at 11:54 AM, Jay Kreps < >>>>>>>>>>> jay.kr...@gmail.com >>>>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> As mentioned in a previous email we are >>> working >>>>> on >>>>>> a >>>>>>>>>>>>>>> re-implementation >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the producer. I would like to use this email >>>>> thread >>>>>>> to >>>>>>>>>>> discuss >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> details >>>>>>>>>>>>>>>>>> of the public API and the configuration. I >>>> would >>>>>> love >>>>>>>> for >>>>>>>>>> us >>>>>>>>>>> to >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>> incredibly picky about this public api now >>> so >>>> it >>>>> is >>>>>>> as >>>>>>>>> good >>>>>>>>>>> as >>>>>>>>>>>>>>> possible >>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> we don't need to break it in the future. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The best way to get a feel for the API is >>>>> actually >>>>>> to >>>>>>>>> take >>>>>>>>>> a >>>>>>>>>>>> look >>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> javadoc, my hope is to get the api docs good >>>>> enough >>>>>>> so >>>>>>>>> that >>>>>>>>>>> it >>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> self-explanatory: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> http://empathybox.com/kafka-javadoc/index.html?kafka/clients/producer/KafkaProducer.html >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Please take a look at this API and give me >>> any >>>>>>> thoughts >>>>>>>>> you >>>>>>>>>>> may >>>>>>>>>>>>>> have! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It may also be reasonable to take a look at >>> the >>>>>>>> configs: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> http://empathybox.com/kafka-javadoc/kafka/clients/producer/ProducerConfig.html >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The actual code is posted here: >>>>>>>>>>>>>>>>>> >>>> https://issues.apache.org/jira/browse/KAFKA-1227 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> A few questions or comments to kick things >>> off: >>>>>>>>>>>>>>>>>> 1. We need to make a decision on whether >>>>>>> serialization >>>>>>>> of >>>>>>>>>> the >>>>>>>>>>>>>> user's >>>>>>>>>>>>>>>> key >>>>>>>>>>>>>>>>>> and value should be done by the user (with >>> our >>>>> api >>>>>>> just >>>>>>>>>>> taking >>>>>>>>>>>>>>> byte[]) >>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>> if we should take an object and allow the >>> user >>>> to >>>>>>>>>> configure a >>>>>>>>>>>>>>>> Serializer >>>>>>>>>>>>>>>>>> class which we instantiate via reflection. >>> We >>>>> take >>>>>>> the >>>>>>>>>> later >>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> the current producer, and I have carried >>> this >>>>>> through >>>>>>>> to >>>>>>>>>> this >>>>>>>>>>>>>>>> prototype. >>>>>>>>>>>>>>>>>> The tradeoff I see is this: taking byte[] is >>>>>> actually >>>>>>>>>>> simpler, >>>>>>>>>>>>> the >>>>>>>>>>>>>>> user >>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>> directly do whatever serialization they >>> like. >>>> The >>>>>>>>>>> complication >>>>>>>>>>>> is >>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>> partitioning. Currently partitioning is done >>>> by a >>>>>>>> similar >>>>>>>>>>>> plug-in >>>>>>>>>>>>>> api >>>>>>>>>>>>>>>>>> (Partitioner) which the user can implement >>> and >>>>>>>> configure >>>>>>>>> to >>>>>>>>>>>>>> override >>>>>>>>>>>>>>>> how >>>>>>>>>>>>>>>>>> partitions are assigned. If we take byte[] >>> as >>>>> input >>>>>>>> then >>>>>>>>> we >>>>>>>>>>>> have >>>>>>>>>>>>> no >>>>>>>>>>>>>>>>> access >>>>>>>>>>>>>>>>>> to the original object and partitioning >>> MUST be >>>>>> done >>>>>>> on >>>>>>>>> the >>>>>>>>>>>>> byte[]. >>>>>>>>>>>>>>>> This >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>> fine for hash partitioning. However for >>> various >>>>>> types >>>>>>>> of >>>>>>>>>>>> semantic >>>>>>>>>>>>>>>>>> partitioning (range partitioning, or >>> whatever) >>>>> you >>>>>>>> would >>>>>>>>>> want >>>>>>>>>>>>>> access >>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> original object. In the current approach a >>>>> producer >>>>>>> who >>>>>>>>>>> wishes >>>>>>>>>>>> to >>>>>>>>>>>>>>> send >>>>>>>>>>>>>>>>>> byte[] they have serialized in their own >>> code >>>> can >>>>>>>>> configure >>>>>>>>>>> the >>>>>>>>>>>>>>>>>> BytesSerialization we supply which is just a >>>> "no >>>>>> op" >>>>>>>>>>>>> serialization. >>>>>>>>>>>>>>>>>> 2. We should obsess over naming and make >>> sure >>>>> each >>>>>> of >>>>>>>> the >>>>>>>>>>> class >>>>>>>>>>>>>> names >>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> good. >>>>>>>>>>>>>>>>>> 3. Jun has already pointed out that we need >>> to >>>>>>> include >>>>>>>>> the >>>>>>>>>>>> topic >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> partition in the response, which is >>> absolutely >>>>>>> right. I >>>>>>>>>>> haven't >>>>>>>>>>>>>> done >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> yet but that definitely needs to be there. >>>>>>>>>>>>>>>>>> 4. Currently RecordSend.await will throw an >>>>>> exception >>>>>>>> if >>>>>>>>>> the >>>>>>>>>>>>>> request >>>>>>>>>>>>>>>>>> failed. The intention here is that >>>>>>>>>>>> producer.send(message).await() >>>>>>>>>>>>>>>> exactly >>>>>>>>>>>>>>>>>> simulates a synchronous call. Guozhang has >>>> noted >>>>>> that >>>>>>>>> this >>>>>>>>>>> is a >>>>>>>>>>>>>>> little >>>>>>>>>>>>>>>>>> annoying since the user must then catch >>>>> exceptions. >>>>>>>>> However >>>>>>>>>>> if >>>>>>>>>>>> we >>>>>>>>>>>>>>>> remove >>>>>>>>>>>>>>>>>> this then if the user doesn't check for >>> errors >>>>> they >>>>>>>> won't >>>>>>>>>>> know >>>>>>>>>>>>> one >>>>>>>>>>>>>>> has >>>>>>>>>>>>>>>>>> occurred, which I predict will be a common >>>>> mistake. >>>>>>>>>>>>>>>>>> 5. Perhaps there is more we could do to make >>>> the >>>>>>> async >>>>>>>>>>>> callbacks >>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> future >>>>>>>>>>>>>>>>>> we give back intuitive and easy to program >>>>> against? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Some background info on implementation: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> At a high level the primary difference in >>> this >>>>>>> producer >>>>>>>>> is >>>>>>>>>>> that >>>>>>>>>>>>> it >>>>>>>>>>>>>>>>> removes >>>>>>>>>>>>>>>>>> the distinction between the "sync" and >>> "async" >>>>>>>> producer. >>>>>>>>>>>>>> Effectively >>>>>>>>>>>>>>>> all >>>>>>>>>>>>>>>>>> requests are sent asynchronously but always >>>>> return >>>>>> a >>>>>>>>> future >>>>>>>>>>>>>> response >>>>>>>>>>>>>>>>> object >>>>>>>>>>>>>>>>>> that gives the offset as well as any error >>> that >>>>> may >>>>>>>> have >>>>>>>>>>>> occurred >>>>>>>>>>>>>>> when >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>> request is complete. The batching that is >>> done >>>> in >>>>>> the >>>>>>>>> async >>>>>>>>>>>>>> producer >>>>>>>>>>>>>>>> only >>>>>>>>>>>>>>>>>> today is done whenever possible now. This >>> means >>>>>> that >>>>>>>> the >>>>>>>>>> sync >>>>>>>>>>>>>>> producer, >>>>>>>>>>>>>>>>>> under load, can get performance as good as >>> the >>>>>> async >>>>>>>>>> producer >>>>>>>>>>>>>>>>> (preliminary >>>>>>>>>>>>>>>>>> results show the producer getting 1m >>>>> messages/sec). >>>>>>>> This >>>>>>>>>>> works >>>>>>>>>>>>>>> similar >>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>> group commit in databases but with respect >>> to >>>> the >>>>>>>> actual >>>>>>>>>>>> network >>>>>>>>>>>>>>>>>> transmission--any messages that arrive >>> while a >>>>> send >>>>>>> is >>>>>>>> in >>>>>>>>>>>>> progress >>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>> batched together. It is also possible to >>>>> encourage >>>>>>>>> batching >>>>>>>>>>>> even >>>>>>>>>>>>>>> under >>>>>>>>>>>>>>>>> low >>>>>>>>>>>>>>>>>> load to save server resources by >>> introducing a >>>>>> delay >>>>>>> on >>>>>>>>> the >>>>>>>>>>>> send >>>>>>>>>>>>> to >>>>>>>>>>>>>>>> allow >>>>>>>>>>>>>>>>>> more messages to accumulate; this is done >>> using >>>>> the >>>>>>>>>>>>> linger.msconfig >>>>>>>>>>>>>>>>> (this >>>>>>>>>>>>>>>>>> is similar to Nagle's algorithm in TCP). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This producer does all network communication >>>>>>>>> asynchronously >>>>>>>>>>> and >>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> parallel >>>>>>>>>>>>>>>>>> to all servers so the performance penalty >>> for >>>>>> acks=-1 >>>>>>>> and >>>>>>>>>>>> waiting >>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> replication should be much reduced. I >>> haven't >>>>> done >>>>>>> much >>>>>>>>>>>>>> benchmarking >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>> this yet, though. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The high level design is described a little >>>> here, >>>>>>>> though >>>>>>>>>> this >>>>>>>>>>>> is >>>>>>>>>>>>>> now >>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>> little out of date: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -Jay >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >>