Hi Damian, After running the example code in mind with these two approaches I'm sold on your arguments :)
Guozhang On Tue, Jul 12, 2016 at 12:49 PM, Sriram Subramanian <r...@confluent.io> wrote: > I liked abstracting the metadata approach as well. It also helps to evolve > in the future. > > On Tue, Jul 12, 2016 at 12:42 PM, Michael Noll <mich...@confluent.io> > wrote: > > > Like Damian I'd also favor a proper type (KafkaStreamsMetadata) rather > than > > a Map-based construct. > > > > -Michael > > > > On Tue, Jul 12, 2016 at 8:45 PM, Damian Guy <damian....@gmail.com> > wrote: > > > > > One more thing on the above, the methods on KafkaStreams should be > > changed > > > to something like: > > > > > > Collection<KafkaStreamsMetadata> allMetadata() > > > > > > Collection<KafkaStreamsMetadata> allMetadataForStore(final String > > > storeName) > > > > > > KafkaStreamsMetadata metadataWithKey(final String storeName, > > > final K key, > > > final Serializer<K> > > > keySerializer) > > > > > > > > > Thanks, > > > Damian > > > > > > On Tue, 12 Jul 2016 at 11:14 Damian Guy <damian....@gmail.com> wrote: > > > > > > > Hi, > > > > > > > > I agree with point 1. application.server is a better name for the > > config > > > > (we'll change this). However, on point 2 I think we should stick > mostly > > > > with what we already have. I've tried both ways of doing this when > > > working > > > > on the JIRA and building examples and I find the current approach > more > > > > intuitive and easier to use than the Map based approach. > > > > However, there is probably a naming issue. We should rename > > > > KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very > > simple, > > > > but provides all the information a developer needs to be able to find > > the > > > > instance(s) of a Streams application that a particular store is > running > > > on, > > > > i.e., > > > > > > > > public class KafkStreamsMetadata { > > > > private final HostInfo hostInfo; > > > > private final Set<String> stateStoreNames; > > > > private final Set<TopicPartition> topicPartitions; > > > > > > > > > > > > So using the API to route to a new host is fairly simple, > particularly > > in > > > > the case when you want to find the host for a particular key, i.e., > > > > > > > > final KafkaStreams kafkaStreams = createKafkaStreams(); > > > > final KafkaStreamsMetadata streamsMetadata = > > > kafkaStreams.instanceWithKey("word-count", "hello", > > > Serdes.String().serializer()); > > > > http.get("http://" + streamsMetadata.host() + ":" + > > > streamsMetadata.port() + "/get/word-count/hello"); > > > > > > > > > > > > And if you want to do a scatter gather approach: > > > > > > > > final KafkaStreams kafkaStreams = createKafkaStreams(); > > > > final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = > > > kafkaStreams.allInstancesWithStore("word-count"); > > > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) { > > > > http.get("http://" + streamsMetadata.host() + ":" + > > > streamsMetadata.port() + "/get/word-count/hello"); > > > > ... > > > > } > > > > > > > > > > > > And if you iterated over all instances: > > > > > > > > final KafkaStreams kafkaStreams = createKafkaStreams(); > > > > final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas = > > > kafkaStreams.allInstances(); > > > > for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) { > > > > if (streamsMetadata.stateStoreNames().contains("word-count")) { > > > > http.get("http://" + streamsMetadata.host() + ":" + > > > streamsMetadata.port() + "/get/word-count/hello"); > > > > ... > > > > } > > > > } > > > > > > > > > > > > If we were to change this to use Map<HostInfo, Set<TaskMetadata>> for > > the > > > > most part users would need to iterate over the entry or key set. > > > Examples: > > > > > > > > The finding an instance by key is a little odd: > > > > > > > > final KafkaStreams kafkaStreams = createKafkaStreams(); > > > > final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > kafkaStreams.instanceWithKey("word-count","hello", > > > Serdes.String().serializer()); > > > > // this is a bit odd as i only expect one: > > > > for (HostInfo hostInfo : streamsMetadata.keySet()) { > > > > http.get("http://" + streamsMetadata.host() + ":" + > > > streamsMetadata.port() + "/get/word-count/hello"); > > > > } > > > > > > > > > > > > The scatter/gather by store is fairly similar to the previous > example: > > > > > > > > final KafkaStreams kafkaStreams = createKafkaStreams(); > > > > final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > kafkaStreams.allInstancesWithStore("word-count"); > > > > for(HostInfo hostInfo : streamsMetadata.keySet()) { > > > > http.get("http://" + hostInfo.host() + ":" + hostInfo.port() + > > > "/get/word-count/hello"); > > > > ... > > > > } > > > > > > > > And iterating over all instances: > > > > > > > > final Map<HostInfo, Set<TaskMetadata>> streamsMetadata = > > > kafkaStreams.allInstances(); > > > > for (Map.Entry<HostInfo, Set<TaskMetadata>> entry : > > > streamsMetadata.entrySet()) { > > > > for (TaskMetadata taskMetadata : entry.getValue()) { > > > > if (taskMetadata.stateStoreNames().contains("word-count")) { > > > > http.get("http://" + streamsMetadata.host() + ":" + > > > streamsMetadata.port() + "/get/word-count/hello"); > > > > ... > > > > } > > > > } > > > > } > > > > > > > > > > > > IMO - having a class we return is the better approach as it nicely > > wraps > > > > the related things, i.e, host:port, store names, topic partitions > into > > an > > > > Object that is easy to use. Further we could add some behaviour to > this > > > > class if we felt it necessary, i.e, hasStore(storeName) etc. > > > > > > > > Anyway, i'm interested in your thoughts. > > > > > > > > Thanks, > > > > Damian > > > > > > > > On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > >> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG: > > > >> > > > >> I agree with Neha that Kafka Streams can provide the bare minimum > APIs > > > >> just > > > >> for host/port, and user's implemented layer can provide URL / proxy > > > >> address > > > >> they want to build on top of it. > > > >> > > > >> > > > >> 2. Re Improving KafkaStreamsInstance interface: > > > >> > > > >> Users are indeed aware of "TaskId" class which is not part of > internal > > > >> packages and is exposed in PartitionGrouper interface that can be > > > >> instantiated by the users, which is assigned with input topic > > > partitions. > > > >> So we can probably change the APIs as: > > > >> > > > >> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() where > > > >> TaskMetadata has fields such as taskId, list of assigned partitions, > > > list > > > >> of state store names; and HostState can include hostname / port. The > > > port > > > >> is the listening port of a user-defined listener that users provide > to > > > >> listen for queries (e.g., using REST APIs). > > > >> > > > >> Map<HostState, Set<TaskMetadata>> > > KafkaStreams.getTasksWithStore(String > > > /* > > > >> storeName */) would return only the hosts and their assigned tasks > if > > at > > > >> least one of the tasks include the given store name. > > > >> > > > >> Map<HostState, Set<TaskMetadata>> > > > KafkaStreams.getTaskWithStoreAndKey(Key > > > >> k, String /* storeName */, StreamPartitioner partitioner) would > return > > > >> only > > > >> the host and their assigned task if the store with the store name > has > > a > > > >> particular key, according to the partitioner behavior. > > > >> > > > >> > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <n...@confluent.io> > > > >> wrote: > > > >> > > > >> > Few thoughts that became apparent after observing example code of > > what > > > >> an > > > >> > application architecture and code might look like with these > > changes. > > > >> > Apologize for the late realization hence. > > > >> > > > > >> > 1. "user.endpoint" will be very differently defined for respective > > > >> > applications. I don't think Kafka Streams should generalize to > > accept > > > >> any > > > >> > connection URL as we expect to only expose metadata expressed as > > > >> HostInfo > > > >> > (which is defined by host & port) and hence need to interpret the > > > >> > "user.endpoint" as host & port. Applications will have their own > > > >> endpoint > > > >> > configs that will take many forms and they will be responsible for > > > >> parsing > > > >> > out host and port and configuring Kafka Streams accordingly. > > > >> > > > > >> > If we are in fact limiting to host and port, I wonder if we should > > > >> change > > > >> > the name of "user.endpoint" into something more specific. We have > > > >> clients > > > >> > expose host/port pairs as "bootstrap.servers". Should this be > > > >> > "application.server"? > > > >> > > > > >> > 2. I don't think we should expose another abstraction called > > > >> > KafkaStreamsInstance to the user. This is related to the > discussion > > of > > > >> the > > > >> > right abstraction that we want to expose to an application. The > > > >> abstraction > > > >> > discussion itself should probably be part of the KIP itself, let > me > > > >> give a > > > >> > quick summary of my thoughts here: > > > >> > 1. The person implementing an application using Queryable State > has > > > >> likely > > > >> > already made some choices for the service layer–a REST framework, > > > >> Thrift, > > > >> > or whatever. We don't really want to add another RPC framework to > > this > > > >> mix, > > > >> > nor do we want to try to make Kafka's RPC mechanism general > purpose. > > > >> > 2. Likewise, it should be clear that the API you want to expose to > > the > > > >> > front-end/client service is not necessarily the API you'd need > > > >> internally > > > >> > as there may be additional filtering/processing in the router. > > > >> > > > > >> > Given these constraints, what we prefer to add is a fairly > low-level > > > >> > "toolbox" that would let you do anything you want, but requires to > > > route > > > >> > and perform any aggregation or processing yourself. This pattern > is > > > >> > not recommended for all kinds of services/apps, but there are > > > >> definitely a > > > >> > category of things where it is a big win and other advanced > > > applications > > > >> > are out-of-scope. > > > >> > > > > >> > The APIs we expose should take the following things into > > > consideration: > > > >> > 1. Make it clear to the user that they will do the routing, > > > aggregation, > > > >> > processing themselves. So the bare minimum that we want to expose > is > > > >> store > > > >> > and partition metadata per application server identified by the > host > > > and > > > >> > port. > > > >> > 2. Ensure that the API exposes abstractions that are known to the > > user > > > >> or > > > >> > are intuitive to the user. > > > >> > 3. Avoid exposing internal objects or implementation details to > the > > > >> user. > > > >> > > > > >> > So tying all this into answering the question of what we should > > expose > > > >> > through the APIs - > > > >> > > > > >> > In Kafka Streams, the user is aware of the concept of tasks and > > > >> partitions > > > >> > since the application scales with the number of partitions and > tasks > > > are > > > >> > the construct for logical parallelism. The user is also aware of > the > > > >> > concept of state stores though until now they were not user > > > accessible. > > > >> > With Queryable State, the bare minimum abstractions that we need > to > > > >> expose > > > >> > are state stores and the location of state store partitions. > > > >> > > > > >> > For exposing the state stores, the getStore() APIs look good but I > > > think > > > >> > for locating the state store partitions, we should go back to the > > > >> original > > > >> > proposal of simply exposing some sort of getPartitionMetadata() > that > > > >> > returns a PartitionMetadata or TaskMetadata object keyed by > > HostInfo. > > > >> > > > > >> > The application will convert the HostInfo (host and port) into > some > > > >> > connection URL to talk to the other app instances via its own RPC > > > >> mechanism > > > >> > depending on whether it needs to scatter-gather or just query. The > > > >> > application will know how a key maps to a partition and through > > > >> > PartitionMetadata it will know how to locate the server that hosts > > the > > > >> > store that has the partition hosting that key. > > > >> > > > > >> > On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll < > mich...@confluent.io> > > > >> wrote: > > > >> > > > > >> > > Addendum in case my previous email wasn't clear: > > > >> > > > > > >> > > > So for any given instance of a streams application there will > > > never > > > >> be > > > >> > > both a v1 and v2 alive at the same time > > > >> > > > > > >> > > That's right. But the current live instance will be able to > tell > > > >> other > > > >> > > instances, via its endpoint setting, whether it wants to be > > > contacted > > > >> at > > > >> > v1 > > > >> > > or at v2. The other instances can't guess that. Think: if an > > older > > > >> > > instance would manually compose the "rest" of an endpoint URI, > > > having > > > >> > only > > > >> > > the host and port from the endpoint setting, it might not know > > that > > > >> the > > > >> > new > > > >> > > instances have a different endpoint suffix, for example). > > > >> > > > > > >> > > > > > >> > > On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll < > > mich...@confluent.io> > > > >> > wrote: > > > >> > > > > > >> > > > Damian, > > > >> > > > > > > >> > > > about the rolling upgrade comment: An instance A will contact > > > >> another > > > >> > > > instance B by the latter's endpoint, right? So if A has no > > > further > > > >> > > > information available than B's host and port, then how should > > > >> instance > > > >> > A > > > >> > > > know whether it should call B at /v1/ or at /v2/? I agree > that > > my > > > >> > > > suggestion isn't foolproof, but it is afaict better than the > > > >> host:port > > > >> > > > approach. > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy < > > damian....@gmail.com> > > > >> > wrote: > > > >> > > > > > > >> > > >> Michael - i'm ok with changing it to a string. Any one else > > have > > > a > > > >> > > strong > > > >> > > >> opinion on this? > > > >> > > >> > > > >> > > >> FWIW - i don't think it will work fine as is during the > rolling > > > >> > upgrade > > > >> > > >> scenario as the service that is listening on the port needs > to > > be > > > >> > > embedded > > > >> > > >> within each instance. So for any given instance of a streams > > > >> > application > > > >> > > >> there will never be both a v1 and v2 alive at the same time > > > >> (unless of > > > >> > > >> course the process didn't shutdown properly, but then you > have > > > >> another > > > >> > > >> problem...). > > > >> > > >> > > > >> > > >> On Fri, 8 Jul 2016 at 15:26 Michael Noll < > mich...@confluent.io > > > > > > >> > wrote: > > > >> > > >> > > > >> > > >> > I have one further comment about > > > >> > `StreamsConfig.USER_ENDPOINT_CONFIG`. > > > >> > > >> > > > > >> > > >> > I think we should consider to not restricting the value of > > this > > > >> > > setting > > > >> > > >> to > > > >> > > >> > only `host:port` pairs. By design, this setting is > capturing > > > >> > > >> user-driven > > > >> > > >> > metadata to define an endpoint, so why restrict the > > creativity > > > or > > > >> > > >> > flexibility of our users? I can imagine, for example, that > > > users > > > >> > > would > > > >> > > >> > like to set values such as `https://host:port > /api/rest/v1/` > > in > > > >> this > > > >> > > >> field > > > >> > > >> > (e.g. being able to distinguish between `.../v1/` and > > `.../v2/` > > > >> may > > > >> > > >> help in > > > >> > > >> > scenarios such as rolling upgrades, where, during the > > upgrade, > > > >> older > > > >> > > >> > instances may need to coexist with newer instances). > > > >> > > >> > > > > >> > > >> > That said, I don't have a strong opinion here. > > > >> > > >> > > > > >> > > >> > -Michael > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax < > > > >> > > matth...@confluent.io> > > > >> > > >> > wrote: > > > >> > > >> > > > > >> > > >> > > +1 > > > >> > > >> > > > > > >> > > >> > > On 07/08/2016 11:03 AM, Eno Thereska wrote: > > > >> > > >> > > > +1 (non-binding) > > > >> > > >> > > > > > > >> > > >> > > >> On 7 Jul 2016, at 18:31, Sriram Subramanian < > > > >> r...@confluent.io> > > > >> > > >> wrote: > > > >> > > >> > > >> > > > >> > > >> > > >> +1 > > > >> > > >> > > >> > > > >> > > >> > > >> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai > > > >> > > >> <h...@pinterest.com.invalid > > > >> > > >> > > > > > >> > > >> > > >> wrote: > > > >> > > >> > > >> > > > >> > > >> > > >>> +1 > > > >> > > >> > > >>> > > > >> > > >> > > >>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll < > > > >> > > >> mich...@confluent.io> > > > >> > > >> > > wrote: > > > >> > > >> > > >>> > > > >> > > >> > > >>>> +1 (non-binding) > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy < > > > >> > > >> damian....@gmail.com> > > > >> > > >> > > >>> wrote: > > > >> > > >> > > >>>> > > > >> > > >> > > >>>>> Thanks Henry - we've updated the KIP with an > example > > > and > > > >> the > > > >> > > new > > > >> > > >> > > config > > > >> > > >> > > >>>>> parameter required. FWIW the user doesn't register > a > > > >> > listener, > > > >> > > >> they > > > >> > > >> > > >>>> provide > > > >> > > >> > > >>>>> a host:port in config. It is expected they will > > start a > > > >> > > service > > > >> > > >> > > running > > > >> > > >> > > >>>> on > > > >> > > >> > > >>>>> that host:port that they can use to connect to the > > > >> running > > > >> > > >> > > KafkaStreams > > > >> > > >> > > >>>>> Instance. > > > >> > > >> > > >>>>> > > > >> > > >> > > >>>>> Thanks, > > > >> > > >> > > >>>>> Damian > > > >> > > >> > > >>>>> > > > >> > > >> > > >>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai > > > >> > > >> <h...@pinterest.com.invalid> > > > >> > > >> > > >>>> wrote: > > > >> > > >> > > >>>>> > > > >> > > >> > > >>>>>> It wasn't quite clear to me how the user program > > > >> interacts > > > >> > > with > > > >> > > >> > the > > > >> > > >> > > >>>>>> discovery API, especially on the user supplied > > > listener > > > >> > part, > > > >> > > >> how > > > >> > > >> > > >>> does > > > >> > > >> > > >>>>> the > > > >> > > >> > > >>>>>> user program supply that listener to KafkaStreams > > and > > > >> how > > > >> > > does > > > >> > > >> > > >>>>> KafkaStreams > > > >> > > >> > > >>>>>> know which port the user listener is running, > maybe > > a > > > >> more > > > >> > > >> > complete > > > >> > > >> > > >>>>>> end-to-end example including the steps on > > registering > > > >> the > > > >> > > user > > > >> > > >> > > >>> listener > > > >> > > >> > > >>>>> and > > > >> > > >> > > >>>>>> whether the user listener needs to be involved > with > > > task > > > >> > > >> > > >>> reassignment. > > > >> > > >> > > >>>>>> > > > >> > > >> > > >>>>>> > > > >> > > >> > > >>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang < > > > >> > > >> wangg...@gmail.com > > > >> > > >> > > > > > >> > > >> > > >>>>> wrote: > > > >> > > >> > > >>>>>> > > > >> > > >> > > >>>>>>> +1 > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy < > > > >> > > >> > damian....@gmail.com> > > > >> > > >> > > >>>>>> wrote: > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>>>> Hi all, > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>>> I'd like to initiate the voting process for > KIP-67 > > > >> > > >> > > >>>>>>>> < > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>> > > > >> > > >> > > >>>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>> > > > >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams > > > >> > > >> > > >>>>>>>>> > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>>> KAFKA-3909 < > > > >> > > https://issues.apache.org/jira/browse/KAFKA-3909 > > > >> > > >> > > > > >> > > >> > is > > > >> > > >> > > >>>> the > > > >> > > >> > > >>>>>> top > > > >> > > >> > > >>>>>>>> level JIRA for this effort. > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>>> Initial PRs for Step 1 of the process are: > > > >> > > >> > > >>>>>>>> Expose State Store Names < > > > >> > > >> > > >>>> https://github.com/apache/kafka/pull/1526> > > > >> > > >> > > >>>>>> and > > > >> > > >> > > >>>>>>>> Query Local State Stores < > > > >> > > >> > > >>>> https://github.com/apache/kafka/pull/1565> > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>>> Thanks, > > > >> > > >> > > >>>>>>>> Damian > > > >> > > >> > > >>>>>>>> > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>>> -- > > > >> > > >> > > >>>>>>> -- Guozhang > > > >> > > >> > > >>>>>>> > > > >> > > >> > > >>>>>> > > > >> > > >> > > >>>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> -- > > > >> > > >> > > >>>> Best regards, > > > >> > > >> > > >>>> Michael Noll > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> > > > >> > > >> > > >>>> *Michael G. Noll | Product Manager | Confluent | +1 > > > >> > > >> > > 650.453.5860Download > > > >> > > >> > > >>>> Apache Kafka and Confluent Platform: > > > >> > www.confluent.io/download > > > >> > > >> > > >>>> <http://www.confluent.io/download>* > > > >> > > >> > > >>>> > > > >> > > >> > > >>> > > > >> > > >> > > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > -- > > > >> > > >> > Best regards, > > > >> > > >> > Michael Noll > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > > > > >> > > >> > *Michael G. Noll | Product Manager | Confluent | +1 > > > >> > > 650.453.5860Download > > > >> > > >> > Apache Kafka and Confluent Platform: > > www.confluent.io/download > > > >> > > >> > <http://www.confluent.io/download>* > > > >> > > >> > > > > >> > > >> > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > -- > > > >> > > > Best regards, > > > >> > > > Michael Noll > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > *Michael G. Noll | Product Manager | Confluent | +1 > > 650.453.5860 > > > >> > > > <%2B1%20650.453.5860>Download Apache Kafka and Confluent > > Platform: > > > >> > > > www.confluent.io/download <http://www.confluent.io/download>* > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > Best regards, > > > >> > > Michael Noll > > > >> > > > > > >> > > > > > >> > > > > > >> > > *Michael G. Noll | Product Manager | Confluent | +1 > > > >> 650.453.5860Download > > > >> > > Apache Kafka and Confluent Platform: www.confluent.io/download > > > >> > > <http://www.confluent.io/download>* > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > Thanks, > > > >> > Neha > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > > > -- > > Best regards, > > Michael Noll > > > > > > > > *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download > > Apache Kafka and Confluent Platform: www.confluent.io/download > > <http://www.confluent.io/download>* > > > -- -- Guozhang