You create your custom Store, i.e,:

/**
 * An interface your custom store provides
 * @param <K>
 * @param <V>
 */
interface MyStoreType<K,V> {
    V get(K key);
    void put(K key, V value);
}

/**
 * Implement your store
 * @param <K>
 * @param <V>
 */
public class MyStoreImpl<K,V> implements StateStore, MyStoreType<K,V> {
    // implementation of the store goes here
}


Provide an implementation of QueryableStoreType to find stores that match
your Custom store:

/**
 * Implement QueryableStoreType to find stores that match your Custom Store
 * @param <K>
 * @param <V>
 */

public class MyQueryableType<K, V> implements
QueryableStoreType<MyStoreType<K, V>>{
    @Override
    public boolean accepts(final StateStore stateStore) {
        return stateStore instanceof MyQueryableType;
    }

    @Override
    public MyStoreType<K,V> create(final StateStoreProvider
storeProvider, final String storeName) {
        return new MyCompositeStore<>(storeName, storeProvider);
    }
}


Create a composite type to wrap the potentially many underlying instances
of the store, i.e, there will be one per partition

/**
 * Provide a wrapper over the underlying store instances.
 */
public class MyCompositeStore<K,V> implements MyStoreType<K,V> {
    private final String storeName;
    private final StateStoreProvider provider;

    public MyCompositeStore(final String storeName, final
StateStoreProvider provider) {
        this.storeName = storeName;
        this.provider = provider;
    }

    @Override
    public V get(final K key) {
        final List<MyStoreType<K, V>> stores =
provider.getStores(storeName, new MyQueryableType<K,V>());
        // iterate over stores looking for key
    }

    @Override
    public void put(final K key, V value) {

    }
}


Lookup your new store from KafkaStreams:

final MyStoreType store = kafkaStreams.store("my-store", new
MyQueryableType<>());


So we get type safety and we can constrain the interfaces returned to Read
Only versions (which is what we are doing for KeyValue and Window Stores)

HTH,
Damian

On Wed, 13 Jul 2016 at 15:30 Jay Kreps <j...@confluent.io> wrote:

> But to avoid the cast you introduce a bunch of magic that doesn't really
> bring type safety, right? Or possibly I'm misunderstanding, how do I plug
> in a new store type and get access to it? Can you give the steps for that?
>
> -Jay
>
> On Wed, Jul 13, 2016 at 10:47 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
>
> > Personally I think the additional complexity of the introduced "
> > QueryableStoreType" interface is still acceptable from a user's point of
> > view: this is the only interface we are exposing to users, and other
> > wrappers are all internal classes.
> >
> > Regarding "QueryableStoreTypes", maybe we can consider declaring its
> > "QueryableStoreTypeMatcher" as private instead of public, since
> > "QueryableStoreTypes" is just used as a convenient manner for using
> > library-provided types, like serialization/Serdes.java.
> >
> > With this the only additional interface the library is exposing is "
> > QueryableStoreType", and users optionally can just use
> > "QueryableStoreTypes"
> > to conveniently create library-provided store types.
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jul 13, 2016 at 7:58 AM, Neha Narkhede <n...@confluent.io>
> wrote:
> >
> > > Damian -- appreciate the example code and you convinced me. Agree that
> > the
> > > class approach is better and renaming to KafkaStreamsMetadata along
> with
> > > renaming the API methods will address the issues I was referring to.
> > >
> > > One other thing I wanted to get people's thoughts on was the way we are
> > > proposing to handle different store types. I am sure you guys have
> > thought
> > > about the tradeoffs of using the store wrappers and matchers (
> > > QueryableStoreType) vs just making users cast the returned store to the
> > > type they would expect to use. That is simple but the obvious downside
> is
> > > that it is likely to result in exceptions for users that don't know
> what
> > > they are doing.
> > >
> > > In my experience of dealing with apps that would use queriable state,
> it
> > > appears to me that a majority would just use the key value store.
> Partly
> > > because that will suffice and partly because people might just follow
> the
> > > simpler examples we provide that use key-value store. For advanced
> users,
> > > they will be aware of the reason they want to use the windowed store
> and
> > > will know how to cast it. The advantage of the current approach is that
> > it
> > > is likely more robust and general but involves introduces more
> interfaces
> > > and wrapper code.
> > >
> > > I tend to prefer simplicity to optimize for the general case, but
> curious
> > > to get people's thoughts on this as well.
> > >
> > > On Wed, Jul 13, 2016 at 8:13 AM, Jim Jagielski <j...@jagunet.com>
> wrote:
> > >
> > > > IMO, that makes the most sense.
> > > >
> > > > > On Jul 12, 2016, at 5:11 PM, Ismael Juma <ism...@juma.me.uk>
> wrote:
> > > > >
> > > > > Hi Damian,
> > > > >
> > > > > How about StreamsMetadata instead? The general naming pattern seems
> > to
> > > > > avoid the `Kafka` prefix for everything outside of `KafkaStreams`
> > > itself.
> > > > >
> > > > > Ismael
> > > > >
> > > > > On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy <damian....@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hi,
> > > > >>
> > > > >> I agree with point 1. application.server is a better name for the
> > > config
> > > > >> (we'll change this). However, on point 2 I think we should stick
> > > mostly
> > > > >> with what we already have. I've tried both ways of doing this when
> > > > working
> > > > >> on the JIRA and building examples and I find the current approach
> > more
> > > > >> intuitive and easier to use than the Map based approach.
> > > > >> However, there is probably a naming issue. We should rename
> > > > >> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very
> > > simple,
> > > > >> but provides all the information a developer needs to be able to
> > find
> > > > the
> > > > >> instance(s) of a Streams application that a particular store is
> > > running
> > > > on,
> > > > >> i.e.,
> > > > >>
> > > > >> public class KafkStreamsMetadata {
> > > > >>    private final HostInfo hostInfo;
> > > > >>    private final Set<String> stateStoreNames;
> > > > >>    private final Set<TopicPartition> topicPartitions;
> > > > >>
> > > > >>
> > > > >> So using the API to route to a new host is fairly simple,
> > particularly
> > > > in
> > > > >> the case when you want to find the host for a particular key,
> i.e.,
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final KafkaStreamsMetadata streamsMetadata =
> > > > >> kafkaStreams.instanceWithKey("word-count", "hello",
> > > > >> Serdes.String().serializer());
> > > > >> http.get("http://"; + streamsMetadata.host() + ":" +
> > > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > > >>
> > > > >>
> > > > >> And if you want to do a scatter gather approach:
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas =
> > > > >> kafkaStreams.allInstancesWithStore("word-count");
> > > > >> for (KafkaStreamsMetadata streamsMetadata :
> kafkaStreamsMetadatas) {
> > > > >>    http.get("http://"; + streamsMetadata.host() + ":" +
> > > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > > >>    ...
> > > > >> }
> > > > >>
> > > > >>
> > > > >> And if you iterated over all instances:
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas =
> > > > >> kafkaStreams.allInstances();
> > > > >> for (KafkaStreamsMetadata streamsMetadata :
> kafkaStreamsMetadatas) {
> > > > >>    if (streamsMetadata.stateStoreNames().contains("word-count")) {
> > > > >>        http.get("http://"; + streamsMetadata.host() + ":" +
> > > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > > >>        ...
> > > > >>    }
> > > > >> }
> > > > >>
> > > > >>
> > > > >> If we were to change this to use Map<HostInfo, Set<TaskMetadata>>
> > for
> > > > the
> > > > >> most part users would need to iterate over the entry or key set.
> > > > Examples:
> > > > >>
> > > > >> The finding an instance by key is a little odd:
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
> > > > >> kafkaStreams.instanceWithKey("word-count","hello",
> > > > >> Serdes.String().serializer());
> > > > >> // this is a bit odd as i only expect one:
> > > > >> for (HostInfo hostInfo : streamsMetadata.keySet()) {
> > > > >>    http.get("http://"; + streamsMetadata.host() + ":" +
> > > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > > >> }
> > > > >>
> > > > >>
> > > > >> The scatter/gather by store is fairly similar to the previous
> > example:
> > > > >>
> > > > >> final KafkaStreams kafkaStreams = createKafkaStreams();
> > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
> > > > >> kafkaStreams.allInstancesWithStore("word-count");
> > > > >> for(HostInfo hostInfo : streamsMetadata.keySet()) {
> > > > >>    http.get("http://"; + hostInfo.host() + ":" + hostInfo.port() +
> > > > >> "/get/word-count/hello");
> > > > >>    ...
> > > > >> }
> > > > >>
> > > > >> And iterating over all instances:
> > > > >>
> > > > >> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
> > > > >> kafkaStreams.allInstances();
> > > > >> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry :
> > > > >> streamsMetadata.entrySet()) {
> > > > >>    for (TaskMetadata taskMetadata : entry.getValue()) {
> > > > >>        if (taskMetadata.stateStoreNames().contains("word-count"))
> {
> > > > >>            http.get("http://"; + streamsMetadata.host() + ":" +
> > > > >> streamsMetadata.port() + "/get/word-count/hello");
> > > > >>            ...
> > > > >>        }
> > > > >>    }
> > > > >> }
> > > > >>
> > > > >>
> > > > >> IMO - having a class we return is the better approach as it nicely
> > > wraps
> > > > >> the related things, i.e, host:port, store names, topic partitions
> > into
> > > > an
> > > > >> Object that is easy to use. Further we could add some behaviour to
> > > this
> > > > >> class if we felt it necessary, i.e, hasStore(storeName) etc.
> > > > >>
> > > > >> Anyway, i'm interested in your thoughts.
> > > > >>
> > > > >> Thanks,
> > > > >> Damian
> > > > >>
> > > > >> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > >>
> > > > >>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
> > > > >>>
> > > > >>> I agree with Neha that Kafka Streams can provide the bare minimum
> > > APIs
> > > > >> just
> > > > >>> for host/port, and user's implemented layer can provide URL /
> proxy
> > > > >> address
> > > > >>> they want to build on top of it.
> > > > >>>
> > > > >>>
> > > > >>> 2. Re Improving KafkaStreamsInstance interface:
> > > > >>>
> > > > >>> Users are indeed aware of "TaskId" class which is not part of
> > > internal
> > > > >>> packages and is exposed in PartitionGrouper interface that can be
> > > > >>> instantiated by the users, which is assigned with input topic
> > > > partitions.
> > > > >>> So we can probably change the APIs as:
> > > > >>>
> > > > >>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks()
> where
> > > > >>> TaskMetadata has fields such as taskId, list of assigned
> > partitions,
> > > > list
> > > > >>> of state store names; and HostState can include hostname / port.
> > The
> > > > port
> > > > >>> is the listening port of a user-defined listener that users
> provide
> > > to
> > > > >>> listen for queries (e.g., using REST APIs).
> > > > >>>
> > > > >>> Map<HostState, Set<TaskMetadata>>
> > > KafkaStreams.getTasksWithStore(String
> > > > >> /*
> > > > >>> storeName */) would return only the hosts and their assigned
> tasks
> > if
> > > > at
> > > > >>> least one of the tasks include the given store name.
> > > > >>>
> > > > >>> Map<HostState, Set<TaskMetadata>>
> > > > KafkaStreams.getTaskWithStoreAndKey(Key
> > > > >>> k, String /* storeName */, StreamPartitioner partitioner) would
> > > return
> > > > >> only
> > > > >>> the host and their assigned task if the store with the store name
> > > has a
> > > > >>> particular key, according to the partitioner behavior.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> Guozhang
> > > > >>>
> > > > >>>
> > > > >>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <
> n...@confluent.io
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Few thoughts that became apparent after observing example code
> of
> > > what
> > > > >> an
> > > > >>>> application architecture and code might look like with these
> > > changes.
> > > > >>>> Apologize for the late realization hence.
> > > > >>>>
> > > > >>>> 1. "user.endpoint" will be very differently defined for
> respective
> > > > >>>> applications. I don't think Kafka Streams should generalize to
> > > accept
> > > > >> any
> > > > >>>> connection URL as we expect to only expose metadata expressed as
> > > > >> HostInfo
> > > > >>>> (which is defined by host & port) and hence need to interpret
> the
> > > > >>>> "user.endpoint" as host & port. Applications will have their own
> > > > >> endpoint
> > > > >>>> configs that will take many forms and they will be responsible
> for
> > > > >>> parsing
> > > > >>>> out host and port and configuring Kafka Streams accordingly.
> > > > >>>>
> > > > >>>> If we are in fact limiting to host and port, I wonder if we
> should
> > > > >> change
> > > > >>>> the name of "user.endpoint" into something more specific. We
> have
> > > > >> clients
> > > > >>>> expose host/port pairs as "bootstrap.servers". Should this be
> > > > >>>> "application.server"?
> > > > >>>>
> > > > >>>> 2. I don't think we should expose another abstraction called
> > > > >>>> KafkaStreamsInstance to the user. This is related to the
> > discussion
> > > of
> > > > >>> the
> > > > >>>> right abstraction that we want to expose to an application. The
> > > > >>> abstraction
> > > > >>>> discussion itself should probably be part of the KIP itself, let
> > me
> > > > >> give
> > > > >>> a
> > > > >>>> quick summary of my thoughts here:
> > > > >>>> 1. The person implementing an application using Queryable State
> > has
> > > > >>> likely
> > > > >>>> already made some choices for the service layer–a REST
> framework,
> > > > >> Thrift,
> > > > >>>> or whatever. We don't really want to add another RPC framework
> to
> > > this
> > > > >>> mix,
> > > > >>>> nor do we want to try to make Kafka's RPC mechanism general
> > purpose.
> > > > >>>> 2. Likewise, it should be clear that the API you want to expose
> to
> > > the
> > > > >>>> front-end/client service is not necessarily the API you'd need
> > > > >> internally
> > > > >>>> as there may be additional filtering/processing in the router.
> > > > >>>>
> > > > >>>> Given these constraints, what we prefer to add is a fairly
> > low-level
> > > > >>>> "toolbox" that would let you do anything you want, but requires
> to
> > > > >> route
> > > > >>>> and perform any aggregation or processing yourself. This pattern
> > is
> > > > >>>> not recommended for all kinds of services/apps, but there are
> > > > >> definitely
> > > > >>> a
> > > > >>>> category of things where it is a big win and other advanced
> > > > >> applications
> > > > >>>> are out-of-scope.
> > > > >>>>
> > > > >>>> The APIs we expose should take the following things into
> > > > consideration:
> > > > >>>> 1. Make it clear to the user that they will do the routing,
> > > > >> aggregation,
> > > > >>>> processing themselves. So the bare minimum that we want to
> expose
> > is
> > > > >>> store
> > > > >>>> and partition metadata per application server identified by the
> > host
> > > > >> and
> > > > >>>> port.
> > > > >>>> 2. Ensure that the API exposes abstractions that are known to
> the
> > > user
> > > > >> or
> > > > >>>> are intuitive to the user.
> > > > >>>> 3. Avoid exposing internal objects or implementation details to
> > the
> > > > >> user.
> > > > >>>>
> > > > >>>> So tying all this into answering the question of what we should
> > > expose
> > > > >>>> through the APIs -
> > > > >>>>
> > > > >>>> In Kafka Streams, the user is aware of the concept of tasks and
> > > > >>> partitions
> > > > >>>> since the application scales with the number of partitions and
> > tasks
> > > > >> are
> > > > >>>> the construct for logical parallelism. The user is also aware of
> > the
> > > > >>>> concept of state stores though until now they were not user
> > > > accessible.
> > > > >>>> With Queryable State, the bare minimum abstractions that we need
> > to
> > > > >>> expose
> > > > >>>> are state stores and the location of state store partitions.
> > > > >>>>
> > > > >>>> For exposing the state stores, the getStore() APIs look good
> but I
> > > > >> think
> > > > >>>> for locating the state store partitions, we should go back to
> the
> > > > >>> original
> > > > >>>> proposal of simply exposing some sort of getPartitionMetadata()
> > that
> > > > >>>> returns a PartitionMetadata or TaskMetadata object keyed by
> > > HostInfo.
> > > > >>>>
> > > > >>>> The application will convert the HostInfo (host and port) into
> > some
> > > > >>>> connection URL to talk to the other app instances via its own
> RPC
> > > > >>> mechanism
> > > > >>>> depending on whether it needs to scatter-gather or just query.
> The
> > > > >>>> application will know how a key maps to a partition and through
> > > > >>>> PartitionMetadata it will know how to locate the server that
> hosts
> > > the
> > > > >>>> store that has the partition hosting that key.
> > > > >>>>
> > > > >>>> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll <
> > mich...@confluent.io>
> > > > >>> wrote:
> > > > >>>>
> > > > >>>>> Addendum in case my previous email wasn't clear:
> > > > >>>>>
> > > > >>>>>> So for any given instance of a streams application there will
> > > never
> > > > >>> be
> > > > >>>>> both a v1 and v2 alive at the same time
> > > > >>>>>
> > > > >>>>> That's right.  But the current live instance will be able to
> tell
> > > > >> other
> > > > >>>>> instances, via its endpoint setting, whether it wants to be
> > > contacted
> > > > >>> at
> > > > >>>> v1
> > > > >>>>> or at v2.  The other instances can't guess that.  Think: if an
> > > older
> > > > >>>>> instance would manually compose the "rest" of an endpoint URI,
> > > having
> > > > >>>> only
> > > > >>>>> the host and port from the endpoint setting, it might not know
> > that
> > > > >> the
> > > > >>>> new
> > > > >>>>> instances have a different endpoint suffix, for example).
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll <
> > mich...@confluent.io
> > > >
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>>> Damian,
> > > > >>>>>>
> > > > >>>>>> about the rolling upgrade comment:  An instance A will contact
> > > > >>> another
> > > > >>>>>> instance B by the latter's endpoint, right?  So if A has no
> > > further
> > > > >>>>>> information available than B's host and port, then how should
> > > > >>> instance
> > > > >>>> A
> > > > >>>>>> know whether it should call B at /v1/ or at /v2/?  I agree
> that
> > my
> > > > >>>>>> suggestion isn't foolproof, but it is afaict better than the
> > > > >>> host:port
> > > > >>>>>> approach.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy <
> > damian....@gmail.com>
> > > > >>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Michael - i'm ok with changing it to a string. Any one else
> > have
> > > a
> > > > >>>>> strong
> > > > >>>>>>> opinion on this?
> > > > >>>>>>>
> > > > >>>>>>> FWIW - i don't think it will work fine as is during the
> rolling
> > > > >>>> upgrade
> > > > >>>>>>> scenario as the service that is listening on the port needs
> to
> > be
> > > > >>>>> embedded
> > > > >>>>>>> within each instance. So for any given instance of a streams
> > > > >>>> application
> > > > >>>>>>> there will never be both a v1 and v2 alive at the same time
> > > > >> (unless
> > > > >>> of
> > > > >>>>>>> course the process didn't shutdown properly, but then you
> have
> > > > >>> another
> > > > >>>>>>> problem...).
> > > > >>>>>>>
> > > > >>>>>>> On Fri, 8 Jul 2016 at 15:26 Michael Noll <
> mich...@confluent.io
> > >
> > > > >>>> wrote:
> > > > >>>>>>>
> > > > >>>>>>>> I have one further comment about
> > > > >>>> `StreamsConfig.USER_ENDPOINT_CONFIG`.
> > > > >>>>>>>>
> > > > >>>>>>>> I think we should consider to not restricting the value of
> > this
> > > > >>>>> setting
> > > > >>>>>>> to
> > > > >>>>>>>> only `host:port` pairs.  By design, this setting is
> capturing
> > > > >>>>>>> user-driven
> > > > >>>>>>>> metadata to define an endpoint, so why restrict the
> creativity
> > > > >> or
> > > > >>>>>>>> flexibility of our users?  I can imagine, for example, that
> > > > >> users
> > > > >>>>> would
> > > > >>>>>>>> like to set values such as `https://host:port/api/rest/v1/`
> > in
> > > > >>> this
> > > > >>>>>>> field
> > > > >>>>>>>> (e.g. being able to distinguish between `.../v1/` and
> > `.../v2/`
> > > > >>> may
> > > > >>>>>>> help in
> > > > >>>>>>>> scenarios such as rolling upgrades, where, during the
> upgrade,
> > > > >>> older
> > > > >>>>>>>> instances may need to coexist with newer instances).
> > > > >>>>>>>>
> > > > >>>>>>>> That said, I don't have a strong opinion here.
> > > > >>>>>>>>
> > > > >>>>>>>> -Michael
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax <
> > > > >>>>> matth...@confluent.io>
> > > > >>>>>>>> wrote:
> > > > >>>>>>>>
> > > > >>>>>>>>> +1
> > > > >>>>>>>>>
> > > > >>>>>>>>> On 07/08/2016 11:03 AM, Eno Thereska wrote:
> > > > >>>>>>>>>> +1 (non-binding)
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> On 7 Jul 2016, at 18:31, Sriram Subramanian <
> > > > >>> r...@confluent.io>
> > > > >>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> +1
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai
> > > > >>>>>>> <h...@pinterest.com.invalid
> > > > >>>>>>>>>
> > > > >>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>> +1
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll <
> > > > >>>>>>> mich...@confluent.io>
> > > > >>>>>>>>> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> +1 (non-binding)
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy <
> > > > >>>>>>> damian....@gmail.com>
> > > > >>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks Henry - we've updated the KIP with an example
> and
> > > > >>> the
> > > > >>>>> new
> > > > >>>>>>>>> config
> > > > >>>>>>>>>>>>>> parameter required. FWIW the user doesn't register a
> > > > >>>> listener,
> > > > >>>>>>> they
> > > > >>>>>>>>>>>>> provide
> > > > >>>>>>>>>>>>>> a host:port in config. It is expected they will start
> a
> > > > >>>>> service
> > > > >>>>>>>>> running
> > > > >>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>> that host:port that they can use to connect to the
> > > > >> running
> > > > >>>>>>>>> KafkaStreams
> > > > >>>>>>>>>>>>>> Instance.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>> Damian
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai
> > > > >>>>>>> <h...@pinterest.com.invalid>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> It wasn't quite clear to me how the user program
> > > > >>> interacts
> > > > >>>>> with
> > > > >>>>>>>> the
> > > > >>>>>>>>>>>>>>> discovery API, especially on the user supplied
> listener
> > > > >>>> part,
> > > > >>>>>>> how
> > > > >>>>>>>>>>>> does
> > > > >>>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> user program supply that listener to KafkaStreams and
> > > > >> how
> > > > >>>>> does
> > > > >>>>>>>>>>>>>> KafkaStreams
> > > > >>>>>>>>>>>>>>> know which port the user listener is running, maybe a
> > > > >>> more
> > > > >>>>>>>> complete
> > > > >>>>>>>>>>>>>>> end-to-end example including the steps on registering
> > > > >> the
> > > > >>>>> user
> > > > >>>>>>>>>>>> listener
> > > > >>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>> whether the user listener needs to be involved with
> > > > >> task
> > > > >>>>>>>>>>>> reassignment.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang <
> > > > >>>>>>> wangg...@gmail.com
> > > > >>>>>>>>>
> > > > >>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> +1
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy <
> > > > >>>>>>>> damian....@gmail.com>
> > > > >>>>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi all,
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> I'd like to initiate the voting process for KIP-67
> > > > >>>>>>>>>>>>>>>>> <
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> KAFKA-3909 <
> > > > >>>>> https://issues.apache.org/jira/browse/KAFKA-3909
> > > > >>>>>>>>
> > > > >>>>>>>> is
> > > > >>>>>>>>>>>>> the
> > > > >>>>>>>>>>>>>>> top
> > > > >>>>>>>>>>>>>>>>> level JIRA for this effort.
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Initial PRs for Step 1 of the process are:
> > > > >>>>>>>>>>>>>>>>> Expose State Store Names <
> > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1526>
> > > > >>>>>>>>>>>>>>> and
> > > > >>>>>>>>>>>>>>>>> Query Local State Stores <
> > > > >>>>>>>>>>>>> https://github.com/apache/kafka/pull/1565>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Thanks,
> > > > >>>>>>>>>>>>>>>>> Damian
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> --
> > > > >>>>>>>>>>>>> Best regards,
> > > > >>>>>>>>>>>>> Michael Noll
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1
> > > > >>>>>>>>> 650.453.5860Download
> > > > >>>>>>>>>>>>> Apache Kafka and Confluent Platform:
> > > > >>>> www.confluent.io/download
> > > > >>>>>>>>>>>>> <http://www.confluent.io/download>*
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> --
> > > > >>>>>>>> Best regards,
> > > > >>>>>>>> Michael Noll
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1
> > > > >>>>> 650.453.5860Download
> > > > >>>>>>>> Apache Kafka and Confluent Platform:
> > www.confluent.io/download
> > > > >>>>>>>> <http://www.confluent.io/download>*
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> --
> > > > >>>>>> Best regards,
> > > > >>>>>> Michael Noll
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> *Michael G. Noll | Product Manager | Confluent | +1
> > 650.453.5860
> > > > >>>>>> <%2B1%20650.453.5860>Download Apache Kafka and Confluent
> > Platform:
> > > > >>>>>> www.confluent.io/download <http://www.confluent.io/download>*
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> --
> > > > >>>>> Best regards,
> > > > >>>>> Michael Noll
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>
> > > > >>>>> *Michael G. Noll | Product Manager | Confluent | +1
> > > > >>> 650.453.5860Download
> > > > >>>>> Apache Kafka and Confluent Platform: www.confluent.io/download
> > > > >>>>> <http://www.confluent.io/download>*
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> Thanks,
> > > > >>>> Neha
> > > > >>>>
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>> --
> > > > >>> -- Guozhang
> > > > >>>
> > > > >>
> > > >
> > > >
> > >
> > >
> > > --
> > > Thanks,
> > > Neha
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to