IMO, that makes the most sense.

> On Jul 12, 2016, at 5:11 PM, Ismael Juma <ism...@juma.me.uk> wrote:
> 
> Hi Damian,
> 
> How about StreamsMetadata instead? The general naming pattern seems to
> avoid the `Kafka` prefix for everything outside of `KafkaStreams` itself.
> 
> Ismael
> 
> On Tue, Jul 12, 2016 at 7:14 PM, Damian Guy <damian....@gmail.com> wrote:
> 
>> Hi,
>> 
>> I agree with point 1. application.server is a better name for the config
>> (we'll change this). However, on point 2 I think we should stick mostly
>> with what we already have. I've tried both ways of doing this when working
>> on the JIRA and building examples and I find the current approach more
>> intuitive and easier to use than the Map based approach.
>> However, there is probably a naming issue. We should rename
>> KafkaStreamsInstance to KafkaStreamsMetadata. This Class is very simple,
>> but provides all the information a developer needs to be able to find the
>> instance(s) of a Streams application that a particular store is running on,
>> i.e.,
>> 
>> public class KafkStreamsMetadata {
>>    private final HostInfo hostInfo;
>>    private final Set<String> stateStoreNames;
>>    private final Set<TopicPartition> topicPartitions;
>> 
>> 
>> So using the API to route to a new host is fairly simple, particularly in
>> the case when you want to find the host for a particular key, i.e.,
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final KafkaStreamsMetadata streamsMetadata =
>> kafkaStreams.instanceWithKey("word-count", "hello",
>> Serdes.String().serializer());
>> http.get("http://"; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>> 
>> 
>> And if you want to do a scatter gather approach:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas =
>> kafkaStreams.allInstancesWithStore("word-count");
>> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>>    http.get("http://"; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>    ...
>> }
>> 
>> 
>> And if you iterated over all instances:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Collection<KafkaStreamsMetadata> kafkaStreamsMetadatas =
>> kafkaStreams.allInstances();
>> for (KafkaStreamsMetadata streamsMetadata : kafkaStreamsMetadatas) {
>>    if (streamsMetadata.stateStoreNames().contains("word-count")) {
>>        http.get("http://"; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>        ...
>>    }
>> }
>> 
>> 
>> If we were to change this to use Map<HostInfo, Set<TaskMetadata>> for the
>> most part users would need to iterate over the entry or key set. Examples:
>> 
>> The finding an instance by key is a little odd:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> kafkaStreams.instanceWithKey("word-count","hello",
>> Serdes.String().serializer());
>> // this is a bit odd as i only expect one:
>> for (HostInfo hostInfo : streamsMetadata.keySet()) {
>>    http.get("http://"; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>> }
>> 
>> 
>> The scatter/gather by store is fairly similar to the previous example:
>> 
>> final KafkaStreams kafkaStreams = createKafkaStreams();
>> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> kafkaStreams.allInstancesWithStore("word-count");
>> for(HostInfo hostInfo : streamsMetadata.keySet()) {
>>    http.get("http://"; + hostInfo.host() + ":" + hostInfo.port() +
>> "/get/word-count/hello");
>>    ...
>> }
>> 
>> And iterating over all instances:
>> 
>> final Map<HostInfo, Set<TaskMetadata>> streamsMetadata =
>> kafkaStreams.allInstances();
>> for (Map.Entry<HostInfo, Set<TaskMetadata>> entry :
>> streamsMetadata.entrySet()) {
>>    for (TaskMetadata taskMetadata : entry.getValue()) {
>>        if (taskMetadata.stateStoreNames().contains("word-count")) {
>>            http.get("http://"; + streamsMetadata.host() + ":" +
>> streamsMetadata.port() + "/get/word-count/hello");
>>            ...
>>        }
>>    }
>> }
>> 
>> 
>> IMO - having a class we return is the better approach as it nicely wraps
>> the related things, i.e, host:port, store names, topic partitions into an
>> Object that is easy to use. Further we could add some behaviour to this
>> class if we felt it necessary, i.e, hasStore(storeName) etc.
>> 
>> Anyway, i'm interested in your thoughts.
>> 
>> Thanks,
>> Damian
>> 
>> On Mon, 11 Jul 2016 at 13:47 Guozhang Wang <wangg...@gmail.com> wrote:
>> 
>>> 1. Re StreamsConfig.USER_ENDPOINT_CONFIG:
>>> 
>>> I agree with Neha that Kafka Streams can provide the bare minimum APIs
>> just
>>> for host/port, and user's implemented layer can provide URL / proxy
>> address
>>> they want to build on top of it.
>>> 
>>> 
>>> 2. Re Improving KafkaStreamsInstance interface:
>>> 
>>> Users are indeed aware of "TaskId" class which is not part of internal
>>> packages and is exposed in PartitionGrouper interface that can be
>>> instantiated by the users, which is assigned with input topic partitions.
>>> So we can probably change the APIs as:
>>> 
>>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getAllTasks() where
>>> TaskMetadata has fields such as taskId, list of assigned partitions, list
>>> of state store names; and HostState can include hostname / port. The port
>>> is the listening port of a user-defined listener that users provide to
>>> listen for queries (e.g., using REST APIs).
>>> 
>>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getTasksWithStore(String
>> /*
>>> storeName */) would return only the hosts and their assigned tasks if at
>>> least one of the tasks include the given store name.
>>> 
>>> Map<HostState, Set<TaskMetadata>> KafkaStreams.getTaskWithStoreAndKey(Key
>>> k, String /* storeName */, StreamPartitioner partitioner) would return
>> only
>>> the host and their assigned task if the store with the store name has a
>>> particular key, according to the partitioner behavior.
>>> 
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> On Sun, Jul 10, 2016 at 11:21 AM, Neha Narkhede <n...@confluent.io>
>> wrote:
>>> 
>>>> Few thoughts that became apparent after observing example code of what
>> an
>>>> application architecture and code might look like with these changes.
>>>> Apologize for the late realization hence.
>>>> 
>>>> 1. "user.endpoint" will be very differently defined for respective
>>>> applications. I don't think Kafka Streams should generalize to accept
>> any
>>>> connection URL as we expect to only expose metadata expressed as
>> HostInfo
>>>> (which is defined by host & port) and hence need to interpret the
>>>> "user.endpoint" as host & port. Applications will have their own
>> endpoint
>>>> configs that will take many forms and they will be responsible for
>>> parsing
>>>> out host and port and configuring Kafka Streams accordingly.
>>>> 
>>>> If we are in fact limiting to host and port, I wonder if we should
>> change
>>>> the name of "user.endpoint" into something more specific. We have
>> clients
>>>> expose host/port pairs as "bootstrap.servers". Should this be
>>>> "application.server"?
>>>> 
>>>> 2. I don't think we should expose another abstraction called
>>>> KafkaStreamsInstance to the user. This is related to the discussion of
>>> the
>>>> right abstraction that we want to expose to an application. The
>>> abstraction
>>>> discussion itself should probably be part of the KIP itself, let me
>> give
>>> a
>>>> quick summary of my thoughts here:
>>>> 1. The person implementing an application using Queryable State has
>>> likely
>>>> already made some choices for the service layer–a REST framework,
>> Thrift,
>>>> or whatever. We don't really want to add another RPC framework to this
>>> mix,
>>>> nor do we want to try to make Kafka's RPC mechanism general purpose.
>>>> 2. Likewise, it should be clear that the API you want to expose to the
>>>> front-end/client service is not necessarily the API you'd need
>> internally
>>>> as there may be additional filtering/processing in the router.
>>>> 
>>>> Given these constraints, what we prefer to add is a fairly low-level
>>>> "toolbox" that would let you do anything you want, but requires to
>> route
>>>> and perform any aggregation or processing yourself. This pattern is
>>>> not recommended for all kinds of services/apps, but there are
>> definitely
>>> a
>>>> category of things where it is a big win and other advanced
>> applications
>>>> are out-of-scope.
>>>> 
>>>> The APIs we expose should take the following things into consideration:
>>>> 1. Make it clear to the user that they will do the routing,
>> aggregation,
>>>> processing themselves. So the bare minimum that we want to expose is
>>> store
>>>> and partition metadata per application server identified by the host
>> and
>>>> port.
>>>> 2. Ensure that the API exposes abstractions that are known to the user
>> or
>>>> are intuitive to the user.
>>>> 3. Avoid exposing internal objects or implementation details to the
>> user.
>>>> 
>>>> So tying all this into answering the question of what we should expose
>>>> through the APIs -
>>>> 
>>>> In Kafka Streams, the user is aware of the concept of tasks and
>>> partitions
>>>> since the application scales with the number of partitions and tasks
>> are
>>>> the construct for logical parallelism. The user is also aware of the
>>>> concept of state stores though until now they were not user accessible.
>>>> With Queryable State, the bare minimum abstractions that we need to
>>> expose
>>>> are state stores and the location of state store partitions.
>>>> 
>>>> For exposing the state stores, the getStore() APIs look good but I
>> think
>>>> for locating the state store partitions, we should go back to the
>>> original
>>>> proposal of simply exposing some sort of getPartitionMetadata() that
>>>> returns a PartitionMetadata or TaskMetadata object keyed by HostInfo.
>>>> 
>>>> The application will convert the HostInfo (host and port) into some
>>>> connection URL to talk to the other app instances via its own RPC
>>> mechanism
>>>> depending on whether it needs to scatter-gather or just query. The
>>>> application will know how a key maps to a partition and through
>>>> PartitionMetadata it will know how to locate the server that hosts the
>>>> store that has the partition hosting that key.
>>>> 
>>>> On Fri, Jul 8, 2016 at 9:40 AM, Michael Noll <mich...@confluent.io>
>>> wrote:
>>>> 
>>>>> Addendum in case my previous email wasn't clear:
>>>>> 
>>>>>> So for any given instance of a streams application there will never
>>> be
>>>>> both a v1 and v2 alive at the same time
>>>>> 
>>>>> That's right.  But the current live instance will be able to tell
>> other
>>>>> instances, via its endpoint setting, whether it wants to be contacted
>>> at
>>>> v1
>>>>> or at v2.  The other instances can't guess that.  Think: if an older
>>>>> instance would manually compose the "rest" of an endpoint URI, having
>>>> only
>>>>> the host and port from the endpoint setting, it might not know that
>> the
>>>> new
>>>>> instances have a different endpoint suffix, for example).
>>>>> 
>>>>> 
>>>>> On Fri, Jul 8, 2016 at 6:37 PM, Michael Noll <mich...@confluent.io>
>>>> wrote:
>>>>> 
>>>>>> Damian,
>>>>>> 
>>>>>> about the rolling upgrade comment:  An instance A will contact
>>> another
>>>>>> instance B by the latter's endpoint, right?  So if A has no further
>>>>>> information available than B's host and port, then how should
>>> instance
>>>> A
>>>>>> know whether it should call B at /v1/ or at /v2/?  I agree that my
>>>>>> suggestion isn't foolproof, but it is afaict better than the
>>> host:port
>>>>>> approach.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Fri, Jul 8, 2016 at 5:15 PM, Damian Guy <damian....@gmail.com>
>>>> wrote:
>>>>>> 
>>>>>>> Michael - i'm ok with changing it to a string. Any one else have a
>>>>> strong
>>>>>>> opinion on this?
>>>>>>> 
>>>>>>> FWIW - i don't think it will work fine as is during the rolling
>>>> upgrade
>>>>>>> scenario as the service that is listening on the port needs to be
>>>>> embedded
>>>>>>> within each instance. So for any given instance of a streams
>>>> application
>>>>>>> there will never be both a v1 and v2 alive at the same time
>> (unless
>>> of
>>>>>>> course the process didn't shutdown properly, but then you have
>>> another
>>>>>>> problem...).
>>>>>>> 
>>>>>>> On Fri, 8 Jul 2016 at 15:26 Michael Noll <mich...@confluent.io>
>>>> wrote:
>>>>>>> 
>>>>>>>> I have one further comment about
>>>> `StreamsConfig.USER_ENDPOINT_CONFIG`.
>>>>>>>> 
>>>>>>>> I think we should consider to not restricting the value of this
>>>>> setting
>>>>>>> to
>>>>>>>> only `host:port` pairs.  By design, this setting is capturing
>>>>>>> user-driven
>>>>>>>> metadata to define an endpoint, so why restrict the creativity
>> or
>>>>>>>> flexibility of our users?  I can imagine, for example, that
>> users
>>>>> would
>>>>>>>> like to set values such as `https://host:port/api/rest/v1/` in
>>> this
>>>>>>> field
>>>>>>>> (e.g. being able to distinguish between `.../v1/` and `.../v2/`
>>> may
>>>>>>> help in
>>>>>>>> scenarios such as rolling upgrades, where, during the upgrade,
>>> older
>>>>>>>> instances may need to coexist with newer instances).
>>>>>>>> 
>>>>>>>> That said, I don't have a strong opinion here.
>>>>>>>> 
>>>>>>>> -Michael
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Fri, Jul 8, 2016 at 2:55 PM, Matthias J. Sax <
>>>>> matth...@confluent.io>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> +1
>>>>>>>>> 
>>>>>>>>> On 07/08/2016 11:03 AM, Eno Thereska wrote:
>>>>>>>>>> +1 (non-binding)
>>>>>>>>>> 
>>>>>>>>>>> On 7 Jul 2016, at 18:31, Sriram Subramanian <
>>> r...@confluent.io>
>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> +1
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Jul 7, 2016 at 9:53 AM, Henry Cai
>>>>>>> <h...@pinterest.com.invalid
>>>>>>>>> 
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> +1
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Jul 7, 2016 at 6:48 AM, Michael Noll <
>>>>>>> mich...@confluent.io>
>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> +1 (non-binding)
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Thu, Jul 7, 2016 at 10:24 AM, Damian Guy <
>>>>>>> damian....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks Henry - we've updated the KIP with an example and
>>> the
>>>>> new
>>>>>>>>> config
>>>>>>>>>>>>>> parameter required. FWIW the user doesn't register a
>>>> listener,
>>>>>>> they
>>>>>>>>>>>>> provide
>>>>>>>>>>>>>> a host:port in config. It is expected they will start a
>>>>> service
>>>>>>>>> running
>>>>>>>>>>>>> on
>>>>>>>>>>>>>> that host:port that they can use to connect to the
>> running
>>>>>>>>> KafkaStreams
>>>>>>>>>>>>>> Instance.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Thu, 7 Jul 2016 at 06:06 Henry Cai
>>>>>>> <h...@pinterest.com.invalid>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> It wasn't quite clear to me how the user program
>>> interacts
>>>>> with
>>>>>>>> the
>>>>>>>>>>>>>>> discovery API, especially on the user supplied listener
>>>> part,
>>>>>>> how
>>>>>>>>>>>> does
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> user program supply that listener to KafkaStreams and
>> how
>>>>> does
>>>>>>>>>>>>>> KafkaStreams
>>>>>>>>>>>>>>> know which port the user listener is running, maybe a
>>> more
>>>>>>>> complete
>>>>>>>>>>>>>>> end-to-end example including the steps on registering
>> the
>>>>> user
>>>>>>>>>>>> listener
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> whether the user listener needs to be involved with
>> task
>>>>>>>>>>>> reassignment.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 9:13 PM, Guozhang Wang <
>>>>>>> wangg...@gmail.com
>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> +1
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Wed, Jul 6, 2016 at 12:44 PM, Damian Guy <
>>>>>>>> damian....@gmail.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I'd like to initiate the voting process for KIP-67
>>>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> KAFKA-3909 <
>>>>> https://issues.apache.org/jira/browse/KAFKA-3909
>>>>>>>> 
>>>>>>>> is
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> top
>>>>>>>>>>>>>>>>> level JIRA for this effort.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Initial PRs for Step 1 of the process are:
>>>>>>>>>>>>>>>>> Expose State Store Names <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/1526>
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> Query Local State Stores <
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/1565>
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>> Michael Noll
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1
>>>>>>>>> 650.453.5860Download
>>>>>>>>>>>>> Apache Kafka and Confluent Platform:
>>>> www.confluent.io/download
>>>>>>>>>>>>> <http://www.confluent.io/download>*
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Best regards,
>>>>>>>> Michael Noll
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> *Michael G. Noll | Product Manager | Confluent | +1
>>>>> 650.453.5860Download
>>>>>>>> Apache Kafka and Confluent Platform: www.confluent.io/download
>>>>>>>> <http://www.confluent.io/download>*
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Best regards,
>>>>>> Michael Noll
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
>>>>>> <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
>>>>>> www.confluent.io/download <http://www.confluent.io/download>*
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> Best regards,
>>>>> Michael Noll
>>>>> 
>>>>> 
>>>>> 
>>>>> *Michael G. Noll | Product Manager | Confluent | +1
>>> 650.453.5860Download
>>>>> Apache Kafka and Confluent Platform: www.confluent.io/download
>>>>> <http://www.confluent.io/download>*
>>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> Thanks,
>>>> Neha
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 

Reply via email to