Hey Guozhang, Good point about unify the handling of MetadataResponse and other ClientResponses.
Thanks for all the feedbacks! I’ll take a shot and upload a patch for review. Jiangjie (Becket) Qin On 5/6/15, 2:19 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: >Jiangjie, > >Just trying to figure the class reference hierarchy of this approach ("A ><- >B" means A either has a member variable of B or take B as API parameter). > >Metadata will have interface that takes in KafkaClient as a parameter, so >Metadata <- KafkaClient > >1. For producer: > >KafkaProducer <- Sender, KafkaProducer <- Metadata, Sender <- KafkaClient, >Sender <- Metadata > >2. For consumer: > >KafkaConsumer <- Coordinator, KafkaConsumer <- Fetcher, KafkaConsumer <- >KafkaClient, KafkaConsumer <- Metadata, Coordinator <- KafkaClient, >Coordinator <- Metadata, Fetcher <- KafkaClient, Fetcher <- Metadata, > >3. For controller: > >KafkaController <- KafkaClient > >4. For replica fetcher: > >ReplicaFetcher <- KafkaClient > >For producer / consumer, the interleaving seems a bit complicated to me. >Instead, we could completely remove the concept of Metadata from >KafkaClient, such that NetworkClient.handleCompletedReceives does not >specifically handle metadata responses, but just call >response.request().callback().onComplete(response) as well, which will try >to update the metadata and check for any errors. > >Guozhang > >On Wed, May 6, 2015 at 10:40 AM, Jiangjie Qin <j...@linkedin.com.invalid> >wrote: > >> Jun & Ewen, >> >> Thanks a lot for the comments. Both of them look better than my original >> plan. >> >> Jun, one downside about not changing NetworkClient but use a different >> metadata implementation is that NetworkClient will still have all the >> metadata related code there which makes it a little bit weird. I think >> Ewen¹s approach solve this problem. >> >> Ewen, If I understand correctly, you are proposing something similar to >> the structure we are using in >> AbstractFetcherThread/ConsumerFetcherThread/ReplicaFetcherThread. That >> will make NetworkClient look clean but increase the layers which is >> probably OK. >> >> Inspired by your suggestions. I have another thought which seems closer >>to >> Jun¹s idea. What if we move >>maybeUpdateMetadata()/handleMetadataResponse() >> and related logic in NetworkClient to metadata and pass in NetworkClient >> as an argument. Like Jun suggested, we need a Metadata interface and >> different implementations. >> >> Thanks. >> >> Jiangjie (Becket) Qin >> >> >> >> On 5/5/15, 11:31 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: >> >> >+1 on trying to reuse the NetworkClient code. >> > >> >I think Jun's approach could work, but I'm wondering if refactoring a >>bit >> >could get better separation of concerns without a somewhat awkward nop >> >implementation of Metadata. I'm not sure what combination of >>delegation or >> >subclassing makes sense yet, but here's another approach that I think >> >could >> >work: >> > >> >* Get rid of metadata stuff from NetworkClient. Add a subclass that >>also >> >manages all the metadata. (Since it's used for both producer and >>consumer, >> >the obvious name that I first jumped to is ClientNetworkClient, but >> >somehow >> >I think we can come up with something less confusing.) >> >* leastLoadedNode is the only caller of metadata.fetch() in that class, >> >maybeUpdateMetadata is the only caller of leastLoadedNode, >> >maybeUpdateMetadata is only called in poll when a combination of >>metadata >> >related timeouts end up being 0. These can be safely refactored into >>the >> >subclass with one override of poll(). Same with metadataFetchInProgress >> >assuming the rest of the changes below. >> >* Some of the default implementations (e.g. handleMetadataResponse) >>can be >> >left nops in NetworkClient and moved to the subclass. >> >* Others can be overridden to call the super method then take the >> >additional action necessary (e.g., on disconnect, move the metadata >>update >> >request to the subclass). >> >* Making the timeout handling in poll() work for both NetworkClient and >> >the >> >new base class might be the messiest part and might require breaking >>down >> >the implementation of poll into multiple methods. >> >* isReady uses metadataFetchInProgress and gets a timeout from the >> >Metadata >> >class. We can just override this method as well, though I feel like >> >there's >> >probably a cleaner solution. >> > >> >-Ewen >> > >> > >> >On Tue, May 5, 2015 at 4:54 PM, Jun Rao <j...@confluent.io> wrote: >> > >> >> Hi, Jiangjie, >> >> >> >> Thanks for taking on this. >> >> >> >> I was thinking that one way to decouple the dependency on Metadata in >> >> NetworkClient is the following. >> >> 1. Make Metadata an interface. >> >> 2. Rename current Metadata class to sth like KafkaMetadata that >> >>implements >> >> the Metadata interface. >> >> 3. Have a new NoOpMetadata class that implements the Metadata >>interface. >> >> This class >> >> 3.1 does nothing for any write method >> >> 3.2 returns max long for any method that asks for a timestamp >> >> 3.3. returns an empty Cluster for fetch(). >> >> >> >> Then we can leave NetworkClient unchanged and just pass in a >> >>NoOpMetadata >> >> when using NetworkClient in the controller. The consumer/producer >>client >> >> will be using KafkaMetadata. >> >> >> >> As for replica fetchers, it may be possible to use KafkaConsumer. >> >>However, >> >> we don't need the metadata and the offset management. So, perhaps >>it's >> >> easier to just use NetworkClient. Also, currently, there is one >>replica >> >> fetcher thread per source broker. By using NetworkClient, we can >>change >> >> that to using a single thread for all source brokers. This is >>probably a >> >> bigger change. So, maybe we can do it later. >> >> >> >> Jun >> >> >> >> >> >> I think we probably need to replace replica fetcher with >>NetworkClient >> >>as >> >> well. Replica fetcher gets leader from the controller and therefore >> >>doesn't >> >> >> >> On Tue, May 5, 2015 at 1:37 PM, Jiangjie Qin >><j...@linkedin.com.invalid >> > >> >> wrote: >> >> >> >> > I am trying to see if we can reuse the NetworkClient class to be >>used >> >>in >> >> > controller to broker communication. (Also, we can probably use >> >> > KafkaConsumer which is already using NetworkClient in replica >> >>fetchers). >> >> > Currently NetworkClient does the following things in addition to >> >>sending >> >> > requests. >> >> > >> >> > 1. Connection state management. >> >> > 2. Flow control (inflight requests) >> >> > 3. Metadata refresh >> >> > >> >> > In controller we need (1) and (2) but not (3). NetworkClient is >> >>tightly >> >> > coupled with metadata now and this is the major blocker of reusing >> >> > NetworkClient in controller. For controller, we don¹t need >> >>NetworkClient >> >> > to manage any metadata because the controller has listeners to >>monitor >> >> the >> >> > cluster state and has all the information about topic metadata. >> >> > I am thinking we can add a disable metadata refresh flag to >> >>NetworkClient >> >> > or set metadata refresh interval to be Long.MAX_VALUE, so the >>metadata >> >> will >> >> > be managed outside NetworkClient. >> >> > This needs minimal change to allow NetworkClient to be reused, but >>the >> >> > ugly part is NetworkClient still has the entire Metadata while it >> >> actually >> >> > only needs a NodeList. >> >> > >> >> > Want to see what do people think about this. >> >> > >> >> > Thanks. >> >> > >> >> > Jiangjie (Becket) Qin >> >> > >> >> > >> >> >> > >> > >> > >> >-- >> >Thanks, >> >Ewen >> >> > > >-- >-- Guozhang