Thanks for updating the KIP! I think it's good as is -- I would not add anything more to TaskMetadata.
About subtopologies and tasks. We do have the concept of subtopologies already in KIP-120. It's only missing and ID that allow to link a subtopology to a task. IMHO, adding a simple variable to `Subtopoloy` that provide the id should be sufficient. We can simply document in the JavaDocs how Subtopology and TaskMetadata can be linked to each other. I did update KIP-120 accordingly. -Matthias On 3/28/17 3:45 PM, Florian Hussonnois wrote: > Hi all, > > I've updated the KIP and the PR to reflect your suggestions. > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > https://github.com/apache/kafka/pull/2612 > > Also, I've exposed property StreamThread#state as a string through the > new class ThreadMetadata. > > Thanks, > > 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com > <mailto:fhussonn...@gmail.com>>: > > Hi Guozhang, Matthias, > > It's a great idea to add sub topologies descriptions. This would > help developers to better understand topology concept. > > I agree that is not really user-friendly to check if > `StreamsMetadata#streamThreads` is not returning null. > > The method name localThreadsMetadata looks good. In addition, it's > more simple to build ThreadMetadata instances from the `StreamTask` > class than from `StreamPartitionAssignor` class. > > I will work on modifications. As I understand, I have to add the > property subTopologyId property to the TaskMetadata class - Am I right ? > > Thanks, > > 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com > <mailto:wangg...@gmail.com>>: > > Re 1): this is a good point. May be we can move > `StreamsMetadata#streamThreads` as > `KafkaStreams#localThreadsMetadata`? > > 3): this is a minor suggestion about function name of > `assignedPartitions`, to `topicPartitions` to be consistent with > `StreamsMetadata`? > > > Guozhang > > On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax > <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: > > Thanks for the progress on this KIP. I think we are on the > right path! > > Couple of comments/questions: > > (1) Why do we not consider the "rejected alternative" to add > the method > to KafkaStreams? The comment on #streamThreads() says: > > "Note this method will return <code>null</code> if called on > {@link > StreamsMetadata} which represent a remote application." > > Thus, if we cannot get any remote metadata, it seems not > straight > forward to not add it to KafkaStreams directly -- this would > avoid > invalid calls and `null` return value in the first place. > > I like the idea about exposing sub-topologies.: > > (2a) I would recommend to rename `topicsGroupId` to > `subTopologyId` :) > > (2b) We could add this to KIP-120 already. However, I would > not just > link both via name, but leverage KIP-120 directly, and add a > "Subtopology" member to the TaskMetadata class. > > > Overall, I like the distinction of KIP-120 only exposing > "static" > information that can be determined before the topology get's > started, > while this KIP allow to access runtime information. > > > > -Matthias > > > On 3/22/17 12:42 PM, Guozhang Wang wrote: > > Thanks for the updated KIP, and sorry for the late replies! > > > > I think a little bit more about KIP-130, and I feel that > if we are going > > to deprecate the `toString` function (it is not explicitly > said in the > > KIP, so I'm not sure if you plan to still keep the > > `KafkaStreams#toString` as is or are going to replace it > with the > > proposed APIs) with the proposed ones, it may be okay. More > > specifically, after both KIP-120 and KIP-130: > > > > 1. users can use `#describe` function to check the > generated topology > > before calling `KafkaStreams#start`, which is static > information. > > 2. users can use the `StreamsMetadata -> ThreadMetadata -> > TaskMetadata` > > programmatically after called `KafkaStreams#start` to get the > > dynamically changeable information. > > > > One thing I'm still not sure though, is that in > `TaskMetadata` we only > > have the TaskId and assigned partitions, whereas in > > "TopologyDescription" introduced in KIP-120, it will > simply describe the > > whole topology possibly composed of multiple > sub-topologies. So it is > > hard for users to tell which sub-topology is executed > under which task > > on-the-fly. > > > > Hence I'm thinking if we can expose the "sub-topology-id" > (named as > > topicsGroupId internally) in > TopologyDescription#Subtopology, and then > > from the task id which is essentially "sub-topology-id DASH > > partition-group-id" users can make the link, though it is > still not that > > straight-forward. > > > > Thoughts? > > > > Guozhang > > > > > > > > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois > > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > <mailto:fhussonn...@gmail.com > <mailto:fhussonn...@gmail.com>>> wrote: > > > > Thanks Guozhang for pointing me to the KIP-120. > > > > I've made some modifications to the KIP. I also proposed a > new PR > > (there is > > still some tests to make). > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>> > > > > Exposing consumed offsets through JMX is sufficient for > debugging > > purpose. > > But I think this could be part to another JIRA as there is > no impact to > > public API. > > > > Thanks > > > > 2017-03-10 22:35 GMT+01:00 Guozhang Wang > <wangg...@gmail.com <mailto:wangg...@gmail.com> > > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>>: > > > > > > Hello Florian, > > > > > > As for programmatically discover monitoring data by > piping metrics > > into a > > > dedicated topic. I think you can actually use a > > KafkaMetricsReporter which > > > pipes the polled metric values into a pre-defined > topic (note that > > in Kafka > > > the MetricsReporter is simply an interface and users > can build > > their own > > > impl in addition to the JMXReporter), for example : > > > > > > https://github.com/krux/kafka-metrics-reporter > <https://github.com/krux/kafka-metrics-reporter> > > <https://github.com/krux/kafka-metrics-reporter > <https://github.com/krux/kafka-metrics-reporter>> > > > > > > As for the "static task-level assignment", what I > meant is that > > the mapping > > > from source-topic-partitions -> tasks are static, > via the > > > "PartitionGrouper", and a task won't switch from an > active task to a > > > standby task, it is actually that an active task > could be > > migrated, as a > > > whole along with all its assigned partitions, to > another thread / > > process > > > and a new standby task will be created on the host > that this > > active task is > > > migrating from. So for the SAME task, its taskMetadata. > > > assignedPartitions() > > > will always return you the same partitions. > > > > > > As for the `toString` function that what we have > today, I feel it > > has some > > > correlations with KIP-120 so I'm trying to > coordinate some > > discussions here > > > (cc'ing Matthias as the owner of KIP-120). My > understand is that: > > > > > > 1. In KIP-120, the `toString` function of > `KafkaStreams` will be > > removed > > > and instead the `Topology#describe` function will be > introduced > > for users > > > to debug the topology BEFORE start running their > instance with the > > > topology. And hence the description won't contain > any task > > information as > > > they are not formed yet. > > > 2. In KIP-130, we want to add the task-level > information for > > monitoring > > > purposes, which is not static and can only be > captured AFTER the > > instance > > > has started running. Again I'm wondering for KIP-130 > alone if > > adding those > > > metrics mentioned in my previous email would suffice > even for the > > use case > > > that you have mentioned. > > > > > > > > > Guozhang > > > > > > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois > > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>> > > > > wrote: > > > > > > > Hi Guozhang > > > > > > > > Thank you for your feedback. I've started to look > more deeply > > into the > > > > code. As you mention, it would be more clever to > use the current > > > > StreamMetadata API to expose these information. > > > > > > > > I think exposing metrics through JMX is great for > building > > monitoring > > > > dashboards using some tools like jmxtrans and grafana. > > > > But for our use case we would like to expose the > states > > directely from > > > the > > > > application embedding the kstreams topologies. > > > > So we expect to be able to retrieve states in a > programmatic way. > > > > > > > > For instance, we could imagin to produce those > states into a > > dedicated > > > > topic. In that way a third application could > automatically > > discover all > > > > kafka-streams applications which could be monitored. > > > > In production environment, that can be clearly a > solution to have a > > > > complete overview of a microservices architecture > based on Kafka > > Streams. > > > > > > > > The toString() method give a lots of information > it can only be > > used for > > > > debugging purpose but not to build a topologies > visualization > > tool. We > > > > could actually expose same details about the > stream topology > > from the > > > > StreamMetadata API ? So the TaskMetadata class you > have > > suggested could > > > > contains similar information that ones return by > the toString > > method from > > > > AbstractTask class ? > > > > > > > > I can update the KIP in that way. > > > > > > > > Finally, I'm not sure to understand your last > point :* "Note > > that the > > > > task-level assignment information is static, i.e. > it will not change > > > during > > > > the runtime" * > > > > > > > > Does that mean when a rebalance occurs new tasks > are created for > > the new > > > > assignments and old ones just switch to a standby > state ? > > > > > > > > Thanks, > > > > > > > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang > <wangg...@gmail.com <mailto:wangg...@gmail.com> > > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>>: > > > > > > > > > > Hello Florian, > > > > > > > > > > Thanks for the KIP and your detailed explanation > of your use > > case. I > > > > think > > > > > there are two dimensions to discuss on how to > improve Streams' > > > > > debuggability (or more specifically state > exposure for > > visualization). > > > > > > > > > > First question is "what information should we > expose to the > > user". From > > > > > your KIP I saw generally three categories: > > > > > > > > > > 1. The state of the thread within a process, as > you mentioned > > currently > > > > we > > > > > only expose the state of the process but not the > finer grained > > > per-thread > > > > > state. > > > > > 2. The state of the task. Currently the most > close API to this is > > > > > StreamsMetadata, > > > > > however it aggregates the tasks across all > threads and only > > present the > > > > > aggregated set of the assigned partitions / > state stores etc. > > We can > > > > > consider extending this method to have a new > > StreamsMetadata#tasks() > > > > which > > > > > returns a TaskMetadata with the similar fields, > and the > > > > > StreamsMetadata.stateStoreNames / etc would > still be returning the > > > > > aggregated results but users can still "drill > down" if they want. > > > > > > > > > > The second question is "how should we expose > them to the > > user". For > > > > > example, you mentioned about > consumedOffsetsByPartition in the > > > > activeTasks. > > > > > We could add this as a JMX metric based on fetch > positions > > inside the > > > > > consumer layer (note that Streams is just > embedding consumers) > > or we > > > > could > > > > > consider adding it into TaskMetadata. Either > case it can be > > visualized > > > > for > > > > > monitoring. The reason we expose StreamsMetadata > as well as > > State was > > > > that > > > > > it is expected to be "polled" in a programmatic > way for > > interactive > > > > queries > > > > > and also for control flows (e.g. I would like to > ONLY start > > running my > > > > > other topology until the first topology has been > up and > > running) while > > > > for > > > > > your case it seems the main purpose is to > continuously query > > them for > > > > > monitoring etc. Personally I'd prefer to expose > them as JMX > > only for > > > such > > > > > purposes only to have a simpler API. > > > > > > > > > > So given your current motivations I'd suggest > expose the following > > > > > information as newly added metrics in Streams: > > > > > > > > > > 1. Thread-level state metric. > > > > > 2. Task-level hosted client identifier metric > (e.g. host:port). > > > > > 3. Consumer-level per-topic/partition position > metric ( > > > > > > https://kafka.apache.org/documentation/#topic_fetch_monitoring > <https://kafka.apache.org/documentation/#topic_fetch_monitoring> > > > <https://kafka.apache.org/documentation/#topic_fetch_monitoring > > <https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). > > > > > > > > > > Note that the task-level assignment information is > static, > > i.e. it will > > > > not > > > > > change during the runtime at all and can be accessed > from the > > > > `toString()` > > > > > function already even before the instance start > running, so I > > think > > > this > > > > > piece of information do not need to be exposed > through JMX > > anymore. > > > > > > > > > > WDYT? > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy > > <damian....@gmail.com <mailto:damian....@gmail.com> > <mailto:damian....@gmail.com <mailto:damian....@gmail.com>>> > > > wrote: > > > > > > > > > > > Hi Florian, > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > It seems there is some overlap here with what we > already have in > > > > > > KafkaStreams.allMetadata(). This currently returns a > > > > > > Collection<StreamsMetadata> where each > StreamsMetadata > > instance holds > > > > the > > > > > > state stores and partition assignment for every > instance of the > > > > > > KafkaStreams application. I'm wondering if that is > good > > enough for > > > what > > > > > you > > > > > > are trying to achieve? If not could it be modified > to > > include the per > > > > > > Thread assignment? > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois < > > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>> > > > > > > > wrote: > > > > > > > > > > > > > Hi Matthias, > > > > > > > > > > > > > > First, I will answer to your last question. > > > > > > > > > > > > > > The main reason to have both > TaskState#assignment and > > > > > > > TaskState#consumedOffsetsByPartition is that > tasks have no > > > consumed > > > > > > offsets > > > > > > > until at least one message is consumed for > each partition > > even if > > > > > > previous > > > > > > > offsets exist for the consumer group. > > > > > > > So yes this methods are redundant as it only > diverge at > > application > > > > > > > startup. > > > > > > > > > > > > > > About the use case, currently we are > developping for a > > customer a > > > > > little > > > > > > > framework based on KafkaStreams which > > transform/denormalize data > > > > before > > > > > > > ingesting into hadoop. > > > > > > > > > > > > > > We have a cluster of workers (SpringBoot) > which instantiate > > > KStreams > > > > > > > topologies dynamicaly based on dataflow > configurations. > > > > > > > Each configuration describes a topic to > consume and how to > > process > > > > > > messages > > > > > > > (this looks like NiFi processors API). > > > > > > > > > > > > > > Our architecture is inspired from > KafkaConnect. We have > > topics for > > > > > > configs > > > > > > > and states which are consumed by each > workers (actually we > > have > > > > reused > > > > > > some > > > > > > > internals classes to the connect API). > > > > > > > > > > > > > > Now, we would like to develop UIs to > visualize topics and > > > partitions > > > > > > > consumed by our worker applications. > > > > > > > > > > > > > > Also, I think it would be nice to be able, > in the futur, to > > > develop > > > > > web > > > > > > > UIs similar to Spark but for KafkaStreams to > visualize > > DAGs...so > > > > maybe > > > > > > this > > > > > > > KIP is just a first step. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax > > <matth...@confluent.io <mailto:matth...@confluent.io> > <mailto:matth...@confluent.io <mailto:matth...@confluent.io>> > > > > >: > > > > > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > > > > > I am wondering a little bit, why you need > to expose this > > > > information. > > > > > > > > Can you describe some use cases? > > > > > > > > > > > > > > > > Would it be worth to unify this new API with > > KafkaStreams#state() > > > > to > > > > > > get > > > > > > > > the overall state of an application > without the need to > > call two > > > > > > > > different methods? Not sure how this > unified API might > > look like > > > > > > though. > > > > > > > > > > > > > > > > > > > > > > > > One minor comment about the API: > TaskState#assignment > > seems to be > > > > > > > > redundant. It should be the same as > > > > > > > > TaskState#consumedOffsetsByPartition.keySet() > > > > > > > > > > > > > > > > Or do I miss something? > > > > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote: > > > > > > > > > Hi Eno, > > > > > > > > > > > > > > > > > > Yes, but the state() method only returns > the global > > state of > > > the > > > > > > > > > KafkaStream application (ie: CREATED, > RUNNING, > > REBALANCING, > > > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING). > > > > > > > > > > > > > > > > > > An alternative to this KIP would be to > change this > > method to > > > > return > > > > > > > more > > > > > > > > > information instead of adding a new method. > > > > > > > > > > > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska < > > > eno.there...@gmail.com > <mailto:eno.there...@gmail.com> > <mailto:eno.there...@gmail.com <mailto:eno.there...@gmail.com>> > > > > >: > > > > > > > > > > > > > > > > > >> Thanks Florian, > > > > > > > > >> > > > > > > > > >> Have you had a chance to look at the new > state methods in > > > > 0.10.2, > > > > > > > e.g., > > > > > > > > >> KafkaStreams.state()? > > > > > > > > >> > > > > > > > > >> Thanks > > > > > > > > >> Eno > > > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois > < > > > > > fhussonn...@gmail.com > <mailto:fhussonn...@gmail.com> <mailto:fhussonn...@gmail.com > <mailto:fhussonn...@gmail.com>> > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > >>> > > > > > > > > >>> Hi all, > > > > > > > > >>> > > > > > > > > >>> I have just created KIP-130 to add a > new method to the > > > > > KafkaStreams > > > > > > > API > > > > > > > > >> in > > > > > > > > >>> order to expose the states of threads > and active tasks. > > > > > > > > >>> > > > > > > > > >>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>> > > > > > > > > >> > 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ > > > public+API > > > > > > > > >>> > > > > > > > > >>> > > > > > > > > >>> Thanks, > > > > > > > > >>> > > > > > > > > >>> -- > > > > > > > > >>> Florian HUSSONNOIS > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Florian HUSSONNOIS > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > -- > > > > Florian HUSSONNOIS > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > Florian HUSSONNOIS > > > > > > > > > > -- > > -- Guozhang > > > > > -- > -- Guozhang > > > > > -- > Florian HUSSONNOIS > > > > > -- > Florian HUSSONNOIS
signature.asc
Description: OpenPGP digital signature