What about KafkaStreams#toString() method? I think, we want to deprecate it as with KIP-120 and the changes of this KIP, is gets obsolete.
If we do so, please update the KIP accordingly. -Matthias On 3/28/17 7:00 PM, Matthias J. Sax wrote: > Thanks for updating the KIP! > > I think it's good as is -- I would not add anything more to TaskMetadata. > > About subtopologies and tasks. We do have the concept of subtopologies > already in KIP-120. It's only missing and ID that allow to link a > subtopology to a task. > > IMHO, adding a simple variable to `Subtopoloy` that provide the id > should be sufficient. We can simply document in the JavaDocs how > Subtopology and TaskMetadata can be linked to each other. > > I did update KIP-120 accordingly. > > > -Matthias > > On 3/28/17 3:45 PM, Florian Hussonnois wrote: >> Hi all, >> >> I've updated the KIP and the PR to reflect your suggestions. >> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >> https://github.com/apache/kafka/pull/2612 >> >> Also, I've exposed property StreamThread#state as a string through the >> new class ThreadMetadata. >> >> Thanks, >> >> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com >> <mailto:fhussonn...@gmail.com>>: >> >> Hi Guozhang, Matthias, >> >> It's a great idea to add sub topologies descriptions. This would >> help developers to better understand topology concept. >> >> I agree that is not really user-friendly to check if >> `StreamsMetadata#streamThreads` is not returning null. >> >> The method name localThreadsMetadata looks good. In addition, it's >> more simple to build ThreadMetadata instances from the `StreamTask` >> class than from `StreamPartitionAssignor` class. >> >> I will work on modifications. As I understand, I have to add the >> property subTopologyId property to the TaskMetadata class - Am I right ? >> >> Thanks, >> >> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com >> <mailto:wangg...@gmail.com>>: >> >> Re 1): this is a good point. May be we can move >> `StreamsMetadata#streamThreads` as >> `KafkaStreams#localThreadsMetadata`? >> >> 3): this is a minor suggestion about function name of >> `assignedPartitions`, to `topicPartitions` to be consistent with >> `StreamsMetadata`? >> >> >> Guozhang >> >> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax >> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: >> >> Thanks for the progress on this KIP. I think we are on the >> right path! >> >> Couple of comments/questions: >> >> (1) Why do we not consider the "rejected alternative" to add >> the method >> to KafkaStreams? The comment on #streamThreads() says: >> >> "Note this method will return <code>null</code> if called on >> {@link >> StreamsMetadata} which represent a remote application." >> >> Thus, if we cannot get any remote metadata, it seems not >> straight >> forward to not add it to KafkaStreams directly -- this would >> avoid >> invalid calls and `null` return value in the first place. >> >> I like the idea about exposing sub-topologies.: >> >> (2a) I would recommend to rename `topicsGroupId` to >> `subTopologyId` :) >> >> (2b) We could add this to KIP-120 already. However, I would >> not just >> link both via name, but leverage KIP-120 directly, and add a >> "Subtopology" member to the TaskMetadata class. >> >> >> Overall, I like the distinction of KIP-120 only exposing >> "static" >> information that can be determined before the topology get's >> started, >> while this KIP allow to access runtime information. >> >> >> >> -Matthias >> >> >> On 3/22/17 12:42 PM, Guozhang Wang wrote: >> > Thanks for the updated KIP, and sorry for the late replies! >> > >> > I think a little bit more about KIP-130, and I feel that >> if we are going >> > to deprecate the `toString` function (it is not explicitly >> said in the >> > KIP, so I'm not sure if you plan to still keep the >> > `KafkaStreams#toString` as is or are going to replace it >> with the >> > proposed APIs) with the proposed ones, it may be okay. More >> > specifically, after both KIP-120 and KIP-130: >> > >> > 1. users can use `#describe` function to check the >> generated topology >> > before calling `KafkaStreams#start`, which is static >> information. >> > 2. users can use the `StreamsMetadata -> ThreadMetadata -> >> TaskMetadata` >> > programmatically after called `KafkaStreams#start` to get the >> > dynamically changeable information. >> > >> > One thing I'm still not sure though, is that in >> `TaskMetadata` we only >> > have the TaskId and assigned partitions, whereas in >> > "TopologyDescription" introduced in KIP-120, it will >> simply describe the >> > whole topology possibly composed of multiple >> sub-topologies. So it is >> > hard for users to tell which sub-topology is executed >> under which task >> > on-the-fly. >> > >> > Hence I'm thinking if we can expose the "sub-topology-id" >> (named as >> > topicsGroupId internally) in >> TopologyDescription#Subtopology, and then >> > from the task id which is essentially "sub-topology-id DASH >> > partition-group-id" users can make the link, though it is >> still not that >> > straight-forward. >> > >> > Thoughts? >> > >> > Guozhang >> > >> > >> > >> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois >> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >> <mailto:fhussonn...@gmail.com >> <mailto:fhussonn...@gmail.com>>> wrote: >> > >> > Thanks Guozhang for pointing me to the KIP-120. >> > >> > I've made some modifications to the KIP. I also proposed a >> new PR >> > (there is >> > still some tests to make). >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API> >> > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>> >> > >> > Exposing consumed offsets through JMX is sufficient for >> debugging >> > purpose. >> > But I think this could be part to another JIRA as there is >> no impact to >> > public API. >> > >> > Thanks >> > >> > 2017-03-10 22:35 GMT+01:00 Guozhang Wang >> <wangg...@gmail.com <mailto:wangg...@gmail.com> >> > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>>: >> >> > >> > > Hello Florian, >> > > >> > > As for programmatically discover monitoring data by >> piping metrics >> > into a >> > > dedicated topic. I think you can actually use a >> > KafkaMetricsReporter which >> > > pipes the polled metric values into a pre-defined >> topic (note that >> > in Kafka >> > > the MetricsReporter is simply an interface and users >> can build >> > their own >> > > impl in addition to the JMXReporter), for example : >> > > >> > > https://github.com/krux/kafka-metrics-reporter >> <https://github.com/krux/kafka-metrics-reporter> >> > <https://github.com/krux/kafka-metrics-reporter >> <https://github.com/krux/kafka-metrics-reporter>> >> > > >> > > As for the "static task-level assignment", what I >> meant is that >> > the mapping >> > > from source-topic-partitions -> tasks are static, >> via the >> > > "PartitionGrouper", and a task won't switch from an >> active task to a >> > > standby task, it is actually that an active task >> could be >> > migrated, as a >> > > whole along with all its assigned partitions, to >> another thread / >> > process >> > > and a new standby task will be created on the host >> that this >> > active task is >> > > migrating from. So for the SAME task, its taskMetadata. >> > > assignedPartitions() >> > > will always return you the same partitions. >> > > >> > > As for the `toString` function that what we have >> today, I feel it >> > has some >> > > correlations with KIP-120 so I'm trying to >> coordinate some >> > discussions here >> > > (cc'ing Matthias as the owner of KIP-120). My >> understand is that: >> > > >> > > 1. In KIP-120, the `toString` function of >> `KafkaStreams` will be >> > removed >> > > and instead the `Topology#describe` function will be >> introduced >> > for users >> > > to debug the topology BEFORE start running their >> instance with the >> > > topology. And hence the description won't contain >> any task >> > information as >> > > they are not formed yet. >> > > 2. In KIP-130, we want to add the task-level >> information for >> > monitoring >> > > purposes, which is not static and can only be >> captured AFTER the >> > instance >> > > has started running. Again I'm wondering for KIP-130 >> alone if >> > adding those >> > > metrics mentioned in my previous email would suffice >> even for the >> > use case >> > > that you have mentioned. >> > > >> > > >> > > Guozhang >> > > >> > > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois >> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>> >> >> > > wrote: >> > > >> > > > Hi Guozhang >> > > > >> > > > Thank you for your feedback. I've started to look >> more deeply >> > into the >> > > > code. As you mention, it would be more clever to >> use the current >> > > > StreamMetadata API to expose these information. >> > > > >> > > > I think exposing metrics through JMX is great for >> building >> > monitoring >> > > > dashboards using some tools like jmxtrans and grafana. >> > > > But for our use case we would like to expose the >> states >> > directely from >> > > the >> > > > application embedding the kstreams topologies. >> > > > So we expect to be able to retrieve states in a >> programmatic way. >> > > > >> > > > For instance, we could imagin to produce those >> states into a >> > dedicated >> > > > topic. In that way a third application could >> automatically >> > discover all >> > > > kafka-streams applications which could be monitored. >> > > > In production environment, that can be clearly a >> solution to have a >> > > > complete overview of a microservices architecture >> based on Kafka >> > Streams. >> > > > >> > > > The toString() method give a lots of information >> it can only be >> > used for >> > > > debugging purpose but not to build a topologies >> visualization >> > tool. We >> > > > could actually expose same details about the >> stream topology >> > from the >> > > > StreamMetadata API ? So the TaskMetadata class you >> have >> > suggested could >> > > > contains similar information that ones return by >> the toString >> > method from >> > > > AbstractTask class ? >> > > > >> > > > I can update the KIP in that way. >> > > > >> > > > Finally, I'm not sure to understand your last >> point :* "Note >> > that the >> > > > task-level assignment information is static, i.e. >> it will not change >> > > during >> > > > the runtime" * >> > > > >> > > > Does that mean when a rebalance occurs new tasks >> are created for >> > the new >> > > > assignments and old ones just switch to a standby >> state ? >> > > > >> > > > Thanks, >> > > > >> > > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang >> <wangg...@gmail.com <mailto:wangg...@gmail.com> >> > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>>: >> >> > > > >> > > > > Hello Florian, >> > > > > >> > > > > Thanks for the KIP and your detailed explanation >> of your use >> > case. I >> > > > think >> > > > > there are two dimensions to discuss on how to >> improve Streams' >> > > > > debuggability (or more specifically state >> exposure for >> > visualization). >> > > > > >> > > > > First question is "what information should we >> expose to the >> > user". From >> > > > > your KIP I saw generally three categories: >> > > > > >> > > > > 1. The state of the thread within a process, as >> you mentioned >> > currently >> > > > we >> > > > > only expose the state of the process but not the >> finer grained >> > > per-thread >> > > > > state. >> > > > > 2. The state of the task. Currently the most >> close API to this is >> > > > > StreamsMetadata, >> > > > > however it aggregates the tasks across all >> threads and only >> > present the >> > > > > aggregated set of the assigned partitions / >> state stores etc. >> > We can >> > > > > consider extending this method to have a new >> > StreamsMetadata#tasks() >> > > > which >> > > > > returns a TaskMetadata with the similar fields, >> and the >> > > > > StreamsMetadata.stateStoreNames / etc would >> still be returning the >> > > > > aggregated results but users can still "drill >> down" if they want. >> > > > > >> > > > > The second question is "how should we expose >> them to the >> > user". For >> > > > > example, you mentioned about >> consumedOffsetsByPartition in the >> > > > activeTasks. >> > > > > We could add this as a JMX metric based on fetch >> positions >> > inside the >> > > > > consumer layer (note that Streams is just >> embedding consumers) >> > or we >> > > > could >> > > > > consider adding it into TaskMetadata. Either >> case it can be >> > visualized >> > > > for >> > > > > monitoring. The reason we expose StreamsMetadata >> as well as >> > State was >> > > > that >> > > > > it is expected to be "polled" in a programmatic >> way for >> > interactive >> > > > queries >> > > > > and also for control flows (e.g. I would like to >> ONLY start >> > running my >> > > > > other topology until the first topology has been >> up and >> > running) while >> > > > for >> > > > > your case it seems the main purpose is to >> continuously query >> > them for >> > > > > monitoring etc. Personally I'd prefer to expose >> them as JMX >> > only for >> > > such >> > > > > purposes only to have a simpler API. >> > > > > >> > > > > So given your current motivations I'd suggest >> expose the following >> > > > > information as newly added metrics in Streams: >> > > > > >> > > > > 1. Thread-level state metric. >> > > > > 2. Task-level hosted client identifier metric >> (e.g. host:port). >> > > > > 3. Consumer-level per-topic/partition position >> metric ( >> > > > > >> https://kafka.apache.org/documentation/#topic_fetch_monitoring >> <https://kafka.apache.org/documentation/#topic_fetch_monitoring> >> > >> <https://kafka.apache.org/documentation/#topic_fetch_monitoring >> >> <https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). >> > > > > >> > > > > Note that the task-level assignment information is >> static, >> > i.e. it will >> > > > not >> > > > > change during the runtime at all and can be accessed >> from the >> > > > `toString()` >> > > > > function already even before the instance start >> running, so I >> > think >> > > this >> > > > > piece of information do not need to be exposed >> through JMX >> > anymore. >> > > > > >> > > > > WDYT? >> > > > > >> > > > > Guozhang >> > > > > >> > > > > >> > > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy >> > <damian....@gmail.com <mailto:damian....@gmail.com> >> <mailto:damian....@gmail.com <mailto:damian....@gmail.com>>> >> > > wrote: >> > > > > >> > > > > > Hi Florian, >> > > > > > >> > > > > > Thanks for the KIP. >> > > > > > >> > > > > > It seems there is some overlap here with what we >> already have in >> > > > > > KafkaStreams.allMetadata(). This currently returns >> a >> > > > > > Collection<StreamsMetadata> where each >> StreamsMetadata >> > instance holds >> > > > the >> > > > > > state stores and partition assignment for every >> instance of the >> > > > > > KafkaStreams application. I'm wondering if that is >> good >> > enough for >> > > what >> > > > > you >> > > > > > are trying to achieve? If not could it be modified >> to >> > include the per >> > > > > > Thread assignment? >> > > > > > >> > > > > > Thanks, >> > > > > > Damian >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois < >> > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>> >> >> > > > > > wrote: >> > > > > > >> > > > > > > Hi Matthias, >> > > > > > > >> > > > > > > First, I will answer to your last question. >> > > > > > > >> > > > > > > The main reason to have both >> TaskState#assignment and >> > > > > > > TaskState#consumedOffsetsByPartition is that >> tasks have no >> > > consumed >> > > > > > offsets >> > > > > > > until at least one message is consumed for >> each partition >> > even if >> > > > > > previous >> > > > > > > offsets exist for the consumer group. >> > > > > > > So yes this methods are redundant as it only >> diverge at >> > application >> > > > > > > startup. >> > > > > > > >> > > > > > > About the use case, currently we are >> developping for a >> > customer a >> > > > > little >> > > > > > > framework based on KafkaStreams which >> > transform/denormalize data >> > > > before >> > > > > > > ingesting into hadoop. >> > > > > > > >> > > > > > > We have a cluster of workers (SpringBoot) >> which instantiate >> > > KStreams >> > > > > > > topologies dynamicaly based on dataflow >> configurations. >> > > > > > > Each configuration describes a topic to >> consume and how to >> > process >> > > > > > messages >> > > > > > > (this looks like NiFi processors API). >> > > > > > > >> > > > > > > Our architecture is inspired from >> KafkaConnect. We have >> > topics for >> > > > > > configs >> > > > > > > and states which are consumed by each >> workers (actually we >> > have >> > > > reused >> > > > > > some >> > > > > > > internals classes to the connect API). >> > > > > > > >> > > > > > > Now, we would like to develop UIs to >> visualize topics and >> > > partitions >> > > > > > > consumed by our worker applications. >> > > > > > > >> > > > > > > Also, I think it would be nice to be able, >> in the futur, to >> > > develop >> > > > > web >> > > > > > > UIs similar to Spark but for KafkaStreams to >> visualize >> > DAGs...so >> > > > maybe >> > > > > > this >> > > > > > > KIP is just a first step. >> > > > > > > >> > > > > > > Thanks, >> > > > > > > >> > > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax >> > <matth...@confluent.io <mailto:matth...@confluent.io> >> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>> >> >> > > >: >> > > > > > > >> > > > > > > > Thanks for the KIP. >> > > > > > > > >> > > > > > > > I am wondering a little bit, why you need >> to expose this >> > > > information. >> > > > > > > > Can you describe some use cases? >> > > > > > > > >> > > > > > > > Would it be worth to unify this new API with >> > KafkaStreams#state() >> > > > to >> > > > > > get >> > > > > > > > the overall state of an application >> without the need to >> > call two >> > > > > > > > different methods? Not sure how this >> unified API might >> > look like >> > > > > > though. >> > > > > > > > >> > > > > > > > >> > > > > > > > One minor comment about the API: >> TaskState#assignment >> > seems to be >> > > > > > > > redundant. It should be the same as >> > > > > > > > TaskState#consumedOffsetsByPartition.keySet() >> > > > > > > > >> > > > > > > > Or do I miss something? >> > > > > > > > >> > > > > > > > >> > > > > > > > -Matthias >> > > > > > > > >> > > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote: >> > > > > > > > > Hi Eno, >> > > > > > > > > >> > > > > > > > > Yes, but the state() method only returns >> the global >> > state of >> > > the >> > > > > > > > > KafkaStream application (ie: CREATED, >> RUNNING, >> > REBALANCING, >> > > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING). >> > > > > > > > > >> > > > > > > > > An alternative to this KIP would be to >> change this >> > method to >> > > > return >> > > > > > > more >> > > > > > > > > information instead of adding a new method. >> > > > > > > > > >> > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska < >> > > eno.there...@gmail.com >> <mailto:eno.there...@gmail.com> >> <mailto:eno.there...@gmail.com <mailto:eno.there...@gmail.com>> >> > > > >: >> > > > > > > > > >> > > > > > > > >> Thanks Florian, >> > > > > > > > >> >> > > > > > > > >> Have you had a chance to look at the new >> state methods in >> > > > 0.10.2, >> > > > > > > e.g., >> > > > > > > > >> KafkaStreams.state()? >> > > > > > > > >> >> > > > > > > > >> Thanks >> > > > > > > > >> Eno >> > > > > > > > >>> On 1 Mar 2017, at 11:54, Florian >> Hussonnois < >> > > > > fhussonn...@gmail.com >> <mailto:fhussonn...@gmail.com> <mailto:fhussonn...@gmail.com >> <mailto:fhussonn...@gmail.com>> >> >> > > > > > > >> > > > > > > > >> wrote: >> > > > > > > > >>> >> > > > > > > > >>> Hi all, >> > > > > > > > >>> >> > > > > > > > >>> I have just created KIP-130 to add a >> new method to the >> > > > > KafkaStreams >> > > > > > > API >> > > > > > > > >> in >> > > > > > > > >>> order to expose the states of threads >> and active tasks. >> > > > > > > > >>> >> > > > > > > > >>> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> >> > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>> >> > > > > > > > >> >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ >> > > public+API >> > > > > > > > >>> >> > > > > > > > >>> >> > > > > > > > >>> Thanks, >> > > > > > > > >>> >> > > > > > > > >>> -- >> > > > > > > > >>> Florian HUSSONNOIS >> > > > > > > > >> >> > > > > > > > >> >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > -- >> > > > > > > Florian HUSSONNOIS >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > > >> > > > >> > > > -- >> > > > Florian HUSSONNOIS >> > > > >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >> > >> > -- >> > Florian HUSSONNOIS >> > >> > >> > >> > >> > -- >> > -- Guozhang >> >> >> >> >> -- >> -- Guozhang >> >> >> >> >> -- >> Florian HUSSONNOIS >> >> >> >> >> -- >> Florian HUSSONNOIS >
signature.asc
Description: OpenPGP digital signature