Hi all, I've updated the KIP and the PR to reflect your suggestions. https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API https://github.com/apache/kafka/pull/2612
Also, I've exposed property StreamThread#state as a string through the new class ThreadMetadata. Thanks, 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com>: > Hi Guozhang, Matthias, > > It's a great idea to add sub topologies descriptions. This would help > developers to better understand topology concept. > > I agree that is not really user-friendly to check if > `StreamsMetadata#streamThreads` > is not returning null. > > The method name localThreadsMetadata looks good. In addition, it's more > simple to build ThreadMetadata instances from the `StreamTask` class than > from `StreamPartitionAssignor` class. > > I will work on modifications. As I understand, I have to add the property > subTopologyId property to the TaskMetadata class - Am I right ? > > Thanks, > > 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com>: > >> Re 1): this is a good point. May be we can move >> `StreamsMetadata#streamThreads` as `KafkaStreams#localThreadsMetadata`? >> >> 3): this is a minor suggestion about function name of >> `assignedPartitions`, to `topicPartitions` to be consistent with >> `StreamsMetadata`? >> >> >> Guozhang >> >> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax <matth...@confluent.io> >> wrote: >> >>> Thanks for the progress on this KIP. I think we are on the right path! >>> >>> Couple of comments/questions: >>> >>> (1) Why do we not consider the "rejected alternative" to add the method >>> to KafkaStreams? The comment on #streamThreads() says: >>> >>> "Note this method will return <code>null</code> if called on {@link >>> StreamsMetadata} which represent a remote application." >>> >>> Thus, if we cannot get any remote metadata, it seems not straight >>> forward to not add it to KafkaStreams directly -- this would avoid >>> invalid calls and `null` return value in the first place. >>> >>> I like the idea about exposing sub-topologies.: >>> >>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :) >>> >>> (2b) We could add this to KIP-120 already. However, I would not just >>> link both via name, but leverage KIP-120 directly, and add a >>> "Subtopology" member to the TaskMetadata class. >>> >>> >>> Overall, I like the distinction of KIP-120 only exposing "static" >>> information that can be determined before the topology get's started, >>> while this KIP allow to access runtime information. >>> >>> >>> >>> -Matthias >>> >>> >>> On 3/22/17 12:42 PM, Guozhang Wang wrote: >>> > Thanks for the updated KIP, and sorry for the late replies! >>> > >>> > I think a little bit more about KIP-130, and I feel that if we are >>> going >>> > to deprecate the `toString` function (it is not explicitly said in the >>> > KIP, so I'm not sure if you plan to still keep the >>> > `KafkaStreams#toString` as is or are going to replace it with the >>> > proposed APIs) with the proposed ones, it may be okay. More >>> > specifically, after both KIP-120 and KIP-130: >>> > >>> > 1. users can use `#describe` function to check the generated topology >>> > before calling `KafkaStreams#start`, which is static information. >>> > 2. users can use the `StreamsMetadata -> ThreadMetadata -> >>> TaskMetadata` >>> > programmatically after called `KafkaStreams#start` to get the >>> > dynamically changeable information. >>> > >>> > One thing I'm still not sure though, is that in `TaskMetadata` we only >>> > have the TaskId and assigned partitions, whereas in >>> > "TopologyDescription" introduced in KIP-120, it will simply describe >>> the >>> > whole topology possibly composed of multiple sub-topologies. So it is >>> > hard for users to tell which sub-topology is executed under which task >>> > on-the-fly. >>> > >>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as >>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then >>> > from the task id which is essentially "sub-topology-id DASH >>> > partition-group-id" users can make the link, though it is still not >>> that >>> > straight-forward. >>> > >>> > Thoughts? >>> > >>> > Guozhang >>> > >>> > >>> > >>> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois >>> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>> wrote: >>> > >>> > Thanks Guozhang for pointing me to the KIP-120. >>> > >>> > I've made some modifications to the KIP. I also proposed a new PR >>> > (there is >>> > still some tests to make). >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3 >>> A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130% >>> 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API> >>> > >>> > Exposing consumed offsets through JMX is sufficient for debugging >>> > purpose. >>> > But I think this could be part to another JIRA as there is no >>> impact to >>> > public API. >>> > >>> > Thanks >>> > >>> > 2017-03-10 22:35 GMT+01:00 Guozhang Wang <wangg...@gmail.com >>> > <mailto:wangg...@gmail.com>>: >>> >>> > >>> > > Hello Florian, >>> > > >>> > > As for programmatically discover monitoring data by piping >>> metrics >>> > into a >>> > > dedicated topic. I think you can actually use a >>> > KafkaMetricsReporter which >>> > > pipes the polled metric values into a pre-defined topic (note >>> that >>> > in Kafka >>> > > the MetricsReporter is simply an interface and users can build >>> > their own >>> > > impl in addition to the JMXReporter), for example : >>> > > >>> > > https://github.com/krux/kafka-metrics-reporter >>> > <https://github.com/krux/kafka-metrics-reporter> >>> > > >>> > > As for the "static task-level assignment", what I meant is that >>> > the mapping >>> > > from source-topic-partitions -> tasks are static, via the >>> > > "PartitionGrouper", and a task won't switch from an active task >>> to a >>> > > standby task, it is actually that an active task could be >>> > migrated, as a >>> > > whole along with all its assigned partitions, to another thread / >>> > process >>> > > and a new standby task will be created on the host that this >>> > active task is >>> > > migrating from. So for the SAME task, its taskMetadata. >>> > > assignedPartitions() >>> > > will always return you the same partitions. >>> > > >>> > > As for the `toString` function that what we have today, I feel it >>> > has some >>> > > correlations with KIP-120 so I'm trying to coordinate some >>> > discussions here >>> > > (cc'ing Matthias as the owner of KIP-120). My understand is that: >>> > > >>> > > 1. In KIP-120, the `toString` function of `KafkaStreams` will be >>> > removed >>> > > and instead the `Topology#describe` function will be introduced >>> > for users >>> > > to debug the topology BEFORE start running their instance with >>> the >>> > > topology. And hence the description won't contain any task >>> > information as >>> > > they are not formed yet. >>> > > 2. In KIP-130, we want to add the task-level information for >>> > monitoring >>> > > purposes, which is not static and can only be captured AFTER the >>> > instance >>> > > has started running. Again I'm wondering for KIP-130 alone if >>> > adding those >>> > > metrics mentioned in my previous email would suffice even for the >>> > use case >>> > > that you have mentioned. >>> > > >>> > > >>> > > Guozhang >>> > > >>> > > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois >>> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>> >>> >>> > > wrote: >>> > > >>> > > > Hi Guozhang >>> > > > >>> > > > Thank you for your feedback. I've started to look more deeply >>> > into the >>> > > > code. As you mention, it would be more clever to use the >>> current >>> > > > StreamMetadata API to expose these information. >>> > > > >>> > > > I think exposing metrics through JMX is great for building >>> > monitoring >>> > > > dashboards using some tools like jmxtrans and grafana. >>> > > > But for our use case we would like to expose the states >>> > directely from >>> > > the >>> > > > application embedding the kstreams topologies. >>> > > > So we expect to be able to retrieve states in a programmatic >>> way. >>> > > > >>> > > > For instance, we could imagin to produce those states into a >>> > dedicated >>> > > > topic. In that way a third application could automatically >>> > discover all >>> > > > kafka-streams applications which could be monitored. >>> > > > In production environment, that can be clearly a solution to >>> have a >>> > > > complete overview of a microservices architecture based on >>> Kafka >>> > Streams. >>> > > > >>> > > > The toString() method give a lots of information it can only be >>> > used for >>> > > > debugging purpose but not to build a topologies visualization >>> > tool. We >>> > > > could actually expose same details about the stream topology >>> > from the >>> > > > StreamMetadata API ? So the TaskMetadata class you have >>> > suggested could >>> > > > contains similar information that ones return by the toString >>> > method from >>> > > > AbstractTask class ? >>> > > > >>> > > > I can update the KIP in that way. >>> > > > >>> > > > Finally, I'm not sure to understand your last point :* "Note >>> > that the >>> > > > task-level assignment information is static, i.e. it will not >>> change >>> > > during >>> > > > the runtime" * >>> > > > >>> > > > Does that mean when a rebalance occurs new tasks are created >>> for >>> > the new >>> > > > assignments and old ones just switch to a standby state ? >>> > > > >>> > > > Thanks, >>> > > > >>> > > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com >>> > <mailto:wangg...@gmail.com>>: >>> >>> > > > >>> > > > > Hello Florian, >>> > > > > >>> > > > > Thanks for the KIP and your detailed explanation of your use >>> > case. I >>> > > > think >>> > > > > there are two dimensions to discuss on how to improve >>> Streams' >>> > > > > debuggability (or more specifically state exposure for >>> > visualization). >>> > > > > >>> > > > > First question is "what information should we expose to the >>> > user". From >>> > > > > your KIP I saw generally three categories: >>> > > > > >>> > > > > 1. The state of the thread within a process, as you mentioned >>> > currently >>> > > > we >>> > > > > only expose the state of the process but not the finer >>> grained >>> > > per-thread >>> > > > > state. >>> > > > > 2. The state of the task. Currently the most close API to >>> this is >>> > > > > StreamsMetadata, >>> > > > > however it aggregates the tasks across all threads and only >>> > present the >>> > > > > aggregated set of the assigned partitions / state stores etc. >>> > We can >>> > > > > consider extending this method to have a new >>> > StreamsMetadata#tasks() >>> > > > which >>> > > > > returns a TaskMetadata with the similar fields, and the >>> > > > > StreamsMetadata.stateStoreNames / etc would still be >>> returning the >>> > > > > aggregated results but users can still "drill down" if they >>> want. >>> > > > > >>> > > > > The second question is "how should we expose them to the >>> > user". For >>> > > > > example, you mentioned about consumedOffsetsByPartition in >>> the >>> > > > activeTasks. >>> > > > > We could add this as a JMX metric based on fetch positions >>> > inside the >>> > > > > consumer layer (note that Streams is just embedding >>> consumers) >>> > or we >>> > > > could >>> > > > > consider adding it into TaskMetadata. Either case it can be >>> > visualized >>> > > > for >>> > > > > monitoring. The reason we expose StreamsMetadata as well as >>> > State was >>> > > > that >>> > > > > it is expected to be "polled" in a programmatic way for >>> > interactive >>> > > > queries >>> > > > > and also for control flows (e.g. I would like to ONLY start >>> > running my >>> > > > > other topology until the first topology has been up and >>> > running) while >>> > > > for >>> > > > > your case it seems the main purpose is to continuously query >>> > them for >>> > > > > monitoring etc. Personally I'd prefer to expose them as JMX >>> > only for >>> > > such >>> > > > > purposes only to have a simpler API. >>> > > > > >>> > > > > So given your current motivations I'd suggest expose the >>> following >>> > > > > information as newly added metrics in Streams: >>> > > > > >>> > > > > 1. Thread-level state metric. >>> > > > > 2. Task-level hosted client identifier metric (e.g. >>> host:port). >>> > > > > 3. Consumer-level per-topic/partition position metric ( >>> > > > > https://kafka.apache.org/documentation/#topic_fetch_monitori >>> ng >>> > <https://kafka.apache.org/documentation/#topic_fetch_monitoring>). >>> > > > > >>> > > > > Note that the task-level assignment information is static, >>> > i.e. it will >>> > > > not >>> > > > > change during the runtime at all and can be accessed from the >>> > > > `toString()` >>> > > > > function already even before the instance start running, so I >>> > think >>> > > this >>> > > > > piece of information do not need to be exposed through JMX >>> > anymore. >>> > > > > >>> > > > > WDYT? >>> > > > > >>> > > > > Guozhang >>> > > > > >>> > > > > >>> > > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy >>> > <damian....@gmail.com <mailto:damian....@gmail.com>> >>> > > wrote: >>> > > > > >>> > > > > > Hi Florian, >>> > > > > > >>> > > > > > Thanks for the KIP. >>> > > > > > >>> > > > > > It seems there is some overlap here with what we already >>> have in >>> > > > > > KafkaStreams.allMetadata(). This currently returns a >>> > > > > > Collection<StreamsMetadata> where each StreamsMetadata >>> > instance holds >>> > > > the >>> > > > > > state stores and partition assignment for every instance >>> of the >>> > > > > > KafkaStreams application. I'm wondering if that is good >>> > enough for >>> > > what >>> > > > > you >>> > > > > > are trying to achieve? If not could it be modified to >>> > include the per >>> > > > > > Thread assignment? >>> > > > > > >>> > > > > > Thanks, >>> > > > > > Damian >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois < >>> > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>> >>> >>> > > > > > wrote: >>> > > > > > >>> > > > > > > Hi Matthias, >>> > > > > > > >>> > > > > > > First, I will answer to your last question. >>> > > > > > > >>> > > > > > > The main reason to have both TaskState#assignment and >>> > > > > > > TaskState#consumedOffsetsByPartition is that tasks have >>> no >>> > > consumed >>> > > > > > offsets >>> > > > > > > until at least one message is consumed for each partition >>> > even if >>> > > > > > previous >>> > > > > > > offsets exist for the consumer group. >>> > > > > > > So yes this methods are redundant as it only diverge at >>> > application >>> > > > > > > startup. >>> > > > > > > >>> > > > > > > About the use case, currently we are developping for a >>> > customer a >>> > > > > little >>> > > > > > > framework based on KafkaStreams which >>> > transform/denormalize data >>> > > > before >>> > > > > > > ingesting into hadoop. >>> > > > > > > >>> > > > > > > We have a cluster of workers (SpringBoot) which >>> instantiate >>> > > KStreams >>> > > > > > > topologies dynamicaly based on dataflow configurations. >>> > > > > > > Each configuration describes a topic to consume and how >>> to >>> > process >>> > > > > > messages >>> > > > > > > (this looks like NiFi processors API). >>> > > > > > > >>> > > > > > > Our architecture is inspired from KafkaConnect. We have >>> > topics for >>> > > > > > configs >>> > > > > > > and states which are consumed by each workers (actually >>> we >>> > have >>> > > > reused >>> > > > > > some >>> > > > > > > internals classes to the connect API). >>> > > > > > > >>> > > > > > > Now, we would like to develop UIs to visualize topics and >>> > > partitions >>> > > > > > > consumed by our worker applications. >>> > > > > > > >>> > > > > > > Also, I think it would be nice to be able, in the >>> futur, to >>> > > develop >>> > > > > web >>> > > > > > > UIs similar to Spark but for KafkaStreams to visualize >>> > DAGs...so >>> > > > maybe >>> > > > > > this >>> > > > > > > KIP is just a first step. >>> > > > > > > >>> > > > > > > Thanks, >>> > > > > > > >>> > > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax >>> > <matth...@confluent.io <mailto:matth...@confluent.io> >>> >>> > > >: >>> > > > > > > >>> > > > > > > > Thanks for the KIP. >>> > > > > > > > >>> > > > > > > > I am wondering a little bit, why you need to expose >>> this >>> > > > information. >>> > > > > > > > Can you describe some use cases? >>> > > > > > > > >>> > > > > > > > Would it be worth to unify this new API with >>> > KafkaStreams#state() >>> > > > to >>> > > > > > get >>> > > > > > > > the overall state of an application without the need to >>> > call two >>> > > > > > > > different methods? Not sure how this unified API might >>> > look like >>> > > > > > though. >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > One minor comment about the API: TaskState#assignment >>> > seems to be >>> > > > > > > > redundant. It should be the same as >>> > > > > > > > TaskState#consumedOffsetsByPartition.keySet() >>> > > > > > > > >>> > > > > > > > Or do I miss something? >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > -Matthias >>> > > > > > > > >>> > > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote: >>> > > > > > > > > Hi Eno, >>> > > > > > > > > >>> > > > > > > > > Yes, but the state() method only returns the global >>> > state of >>> > > the >>> > > > > > > > > KafkaStream application (ie: CREATED, RUNNING, >>> > REBALANCING, >>> > > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING). >>> > > > > > > > > >>> > > > > > > > > An alternative to this KIP would be to change this >>> > method to >>> > > > return >>> > > > > > > more >>> > > > > > > > > information instead of adding a new method. >>> > > > > > > > > >>> > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska < >>> > > eno.there...@gmail.com <mailto:eno.there...@gmail.com> >>> > > > >: >>> > > > > > > > > >>> > > > > > > > >> Thanks Florian, >>> > > > > > > > >> >>> > > > > > > > >> Have you had a chance to look at the new state >>> methods in >>> > > > 0.10.2, >>> > > > > > > e.g., >>> > > > > > > > >> KafkaStreams.state()? >>> > > > > > > > >> >>> > > > > > > > >> Thanks >>> > > > > > > > >> Eno >>> > > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois < >>> > > > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >>> >>> > > > > > > >>> > > > > > > > >> wrote: >>> > > > > > > > >>> >>> > > > > > > > >>> Hi all, >>> > > > > > > > >>> >>> > > > > > > > >>> I have just created KIP-130 to add a new method to >>> the >>> > > > > KafkaStreams >>> > > > > > > API >>> > > > > > > > >> in >>> > > > > > > > >>> order to expose the states of threads and active >>> tasks. >>> > > > > > > > >>> >>> > > > > > > > >>> >>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> > <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> >>> > > > > > > > >> 130%3A+Expose+states+of+active >>> +tasks+to+KafkaStreams+ >>> > > public+API >>> > > > > > > > >>> >>> > > > > > > > >>> >>> > > > > > > > >>> Thanks, >>> > > > > > > > >>> >>> > > > > > > > >>> -- >>> > > > > > > > >>> Florian HUSSONNOIS >>> > > > > > > > >> >>> > > > > > > > >> >>> > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > -- >>> > > > > > > Florian HUSSONNOIS >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > > -- >>> > > > > -- Guozhang >>> > > > > >>> > > > >>> > > > >>> > > > >>> > > > -- >>> > > > Florian HUSSONNOIS >>> > > > >>> > > >>> > > >>> > > >>> > > -- >>> > > -- Guozhang >>> > > >>> > >>> > >>> > >>> > -- >>> > Florian HUSSONNOIS >>> > >>> > >>> > >>> > >>> > -- >>> > -- Guozhang >>> >>> >> >> >> -- >> -- Guozhang >> > > > > -- > Florian HUSSONNOIS > -- Florian HUSSONNOIS