Hi all,

I've updated the KIP and the PR to reflect your suggestions.
https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
https://github.com/apache/kafka/pull/2612

Also, I've exposed property StreamThread#state as a string through the new
class ThreadMetadata.

Thanks,

2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com>:

> Hi Guozhang, Matthias,
>
> It's a great idea to add sub topologies descriptions. This would help
> developers to better understand topology concept.
>
> I agree that is not really user-friendly to check if 
> `StreamsMetadata#streamThreads`
> is not returning null.
>
> The method name localThreadsMetadata looks good. In addition, it's more
> simple to build ThreadMetadata instances from the `StreamTask` class than
> from `StreamPartitionAssignor` class.
>
> I will work on modifications. As I understand, I have to add the property
> subTopologyId property to the TaskMetadata class - Am I right ?
>
> Thanks,
>
> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:
>
>> Re 1): this is a good point. May be we can move
>> `StreamsMetadata#streamThreads` as `KafkaStreams#localThreadsMetadata`?
>>
>> 3): this is a minor suggestion about function name of
>> `assignedPartitions`, to `topicPartitions` to be consistent with
>> `StreamsMetadata`?
>>
>>
>> Guozhang
>>
>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>
>>> Thanks for the progress on this KIP. I think we are on the right path!
>>>
>>> Couple of comments/questions:
>>>
>>> (1) Why do we not consider the "rejected alternative" to add the method
>>> to KafkaStreams? The comment on #streamThreads() says:
>>>
>>> "Note this method will return <code>null</code> if called on {@link
>>> StreamsMetadata} which represent a remote application."
>>>
>>> Thus, if we cannot get any remote metadata, it seems not straight
>>> forward to not add it to KafkaStreams directly -- this would avoid
>>> invalid calls and `null` return value in the first place.
>>>
>>> I like the idea about exposing sub-topologies.:
>>>
>>> (2a) I would recommend to rename `topicsGroupId` to `subTopologyId` :)
>>>
>>> (2b) We could add this to KIP-120 already. However, I would not just
>>> link both via name, but leverage KIP-120 directly, and add a
>>> "Subtopology" member to the TaskMetadata class.
>>>
>>>
>>> Overall, I like the distinction of KIP-120 only exposing "static"
>>> information that can be determined before the topology get's started,
>>> while this KIP allow to access runtime information.
>>>
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>> > Thanks for the updated KIP, and sorry for the late replies!
>>> >
>>> > I think a little bit more about KIP-130, and I feel that if we are
>>> going
>>> > to deprecate the `toString` function (it is not explicitly said in the
>>> > KIP, so I'm not sure if you plan to still keep the
>>> > `KafkaStreams#toString` as is or are going to replace it with the
>>> > proposed APIs) with the proposed ones, it may be okay. More
>>> > specifically, after both KIP-120 and KIP-130:
>>> >
>>> > 1. users can use `#describe` function to check the generated topology
>>> > before calling `KafkaStreams#start`, which is static information.
>>> > 2. users can use the `StreamsMetadata -> ThreadMetadata ->
>>> TaskMetadata`
>>> > programmatically after called `KafkaStreams#start` to get the
>>> > dynamically changeable information.
>>> >
>>> > One thing I'm still not sure though, is that in `TaskMetadata` we only
>>> > have the TaskId and assigned partitions, whereas in
>>> > "TopologyDescription" introduced in KIP-120, it will simply describe
>>> the
>>> > whole topology possibly composed of multiple sub-topologies. So it is
>>> > hard for users to tell which sub-topology is executed under which task
>>> > on-the-fly.
>>> >
>>> > Hence I'm thinking if we can expose the "sub-topology-id" (named as
>>> > topicsGroupId internally) in TopologyDescription#Subtopology, and then
>>> > from the task id which is essentially "sub-topology-id DASH
>>> > partition-group-id" users can make the link, though it is still not
>>> that
>>> > straight-forward.
>>> >
>>> > Thoughts?
>>> >
>>> > Guozhang
>>> >
>>> >
>>> >
>>> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>>> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>> wrote:
>>> >
>>> >     Thanks Guozhang for pointing me to the KIP-120.
>>> >
>>> >     I've made some modifications to the KIP. I also proposed a new PR
>>> >     (there is
>>> >     still some tests to make).
>>> >     https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3
>>> A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>> >     <https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%
>>> 3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API>
>>> >
>>> >     Exposing consumed offsets through JMX is sufficient for debugging
>>> >     purpose.
>>> >     But I think this could be part to another JIRA as there is no
>>> impact to
>>> >     public API.
>>> >
>>> >     Thanks
>>> >
>>> >     2017-03-10 22:35 GMT+01:00 Guozhang Wang <wangg...@gmail.com
>>> >     <mailto:wangg...@gmail.com>>:
>>>
>>> >
>>> >     > Hello Florian,
>>> >     >
>>> >     > As for programmatically discover monitoring data by piping
>>> metrics
>>> >     into a
>>> >     > dedicated topic. I think you can actually use a
>>> >     KafkaMetricsReporter which
>>> >     > pipes the polled metric values into a pre-defined topic (note
>>> that
>>> >     in Kafka
>>> >     > the MetricsReporter is simply an interface and users can build
>>> >     their own
>>> >     > impl in addition to the JMXReporter), for example :
>>> >     >
>>> >     > https://github.com/krux/kafka-metrics-reporter
>>> >     <https://github.com/krux/kafka-metrics-reporter>
>>> >     >
>>> >     > As for the "static task-level assignment", what I meant is that
>>> >     the mapping
>>> >     > from source-topic-partitions -> tasks are static, via the
>>> >     > "PartitionGrouper", and a task won't switch from an active task
>>> to a
>>> >     > standby task, it is actually that an active task could be
>>> >     migrated, as a
>>> >     > whole along with all its assigned partitions, to another thread /
>>> >     process
>>> >     > and a new standby task will be created on the host that this
>>> >     active task is
>>> >     > migrating from. So for the SAME task, its taskMetadata.
>>> >     > assignedPartitions()
>>> >     > will always return you the same partitions.
>>> >     >
>>> >     > As for the `toString` function that what we have today, I feel it
>>> >     has some
>>> >     > correlations with KIP-120 so I'm trying to coordinate some
>>> >     discussions here
>>> >     > (cc'ing Matthias as the owner of KIP-120). My understand is that:
>>> >     >
>>> >     > 1. In KIP-120, the `toString` function of `KafkaStreams` will be
>>> >     removed
>>> >     > and instead the `Topology#describe` function will be introduced
>>> >     for users
>>> >     > to debug the topology BEFORE start running their instance with
>>> the
>>> >     > topology. And hence the description won't contain any task
>>> >     information as
>>> >     > they are not formed yet.
>>> >     > 2. In KIP-130, we want to add the task-level information for
>>> >     monitoring
>>> >     > purposes, which is not static and can only be captured AFTER the
>>> >     instance
>>> >     > has started running. Again I'm wondering for KIP-130 alone if
>>> >     adding those
>>> >     > metrics mentioned in my previous email would suffice even for the
>>> >     use case
>>> >     > that you have mentioned.
>>> >     >
>>> >     >
>>> >     > Guozhang
>>> >     >
>>> >     > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
>>> >     <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>
>>>
>>> >     > wrote:
>>> >     >
>>> >     > > Hi Guozhang
>>> >     > >
>>> >     > > Thank you for your feedback. I've started to look more deeply
>>> >     into the
>>> >     > > code. As you mention, it would be more clever to use the
>>> current
>>> >     > > StreamMetadata API to expose these information.
>>> >     > >
>>> >     > > I think exposing metrics through JMX is great for building
>>> >     monitoring
>>> >     > > dashboards using some tools like jmxtrans and grafana.
>>> >     > > But for our use case we would like to expose the states
>>> >     directely from
>>> >     > the
>>> >     > > application embedding the kstreams topologies.
>>> >     > > So we expect to be able to retrieve states in a programmatic
>>> way.
>>> >     > >
>>> >     > > For instance, we could imagin to produce those states into a
>>> >     dedicated
>>> >     > > topic. In that way a third application could automatically
>>> >     discover all
>>> >     > > kafka-streams applications which could be monitored.
>>> >     > > In production environment, that can be clearly a solution to
>>> have a
>>> >     > > complete overview of a microservices architecture based on
>>> Kafka
>>> >     Streams.
>>> >     > >
>>> >     > > The toString() method give a lots of information it can only be
>>> >     used for
>>> >     > > debugging purpose but not to build a topologies visualization
>>> >     tool. We
>>> >     > > could actually expose same details about the stream topology
>>> >     from the
>>> >     > > StreamMetadata API ? So the TaskMetadata class you have
>>> >     suggested could
>>> >     > > contains similar information that ones return by the toString
>>> >     method from
>>> >     > > AbstractTask class ?
>>> >     > >
>>> >     > > I can update the KIP in that way.
>>> >     > >
>>> >     > > Finally,  I'm not sure to understand your last point :* "Note
>>> >     that the
>>> >     > > task-level assignment information is static, i.e. it will not
>>> change
>>> >     > during
>>> >     > > the runtime" *
>>> >     > >
>>> >     > > Does that mean when a rebalance occurs new tasks are created
>>> for
>>> >     the new
>>> >     > > assignments and old ones just switch to a standby state ?
>>> >     > >
>>> >     > > Thanks,
>>> >     > >
>>> >     > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com
>>> >     <mailto:wangg...@gmail.com>>:
>>>
>>> >     > >
>>> >     > > > Hello Florian,
>>> >     > > >
>>> >     > > > Thanks for the KIP and your detailed explanation of your use
>>> >     case. I
>>> >     > > think
>>> >     > > > there are two dimensions to discuss on how to improve
>>> Streams'
>>> >     > > > debuggability (or more specifically state exposure for
>>> >     visualization).
>>> >     > > >
>>> >     > > > First question is "what information should we expose to the
>>> >     user". From
>>> >     > > > your KIP I saw generally three categories:
>>> >     > > >
>>> >     > > > 1. The state of the thread within a process, as you mentioned
>>> >     currently
>>> >     > > we
>>> >     > > > only expose the state of the process but not the finer
>>> grained
>>> >     > per-thread
>>> >     > > > state.
>>> >     > > > 2. The state of the task. Currently the most close API to
>>> this is
>>> >     > > > StreamsMetadata,
>>> >     > > > however it aggregates the tasks across all threads and only
>>> >     present the
>>> >     > > > aggregated set of the assigned partitions / state stores etc.
>>> >     We can
>>> >     > > > consider extending this method to have a new
>>> >     StreamsMetadata#tasks()
>>> >     > > which
>>> >     > > > returns a TaskMetadata with the similar fields, and the
>>> >     > > > StreamsMetadata.stateStoreNames / etc would still be
>>> returning the
>>> >     > > > aggregated results but users can still "drill down" if they
>>> want.
>>> >     > > >
>>> >     > > > The second question is "how should we expose them to the
>>> >     user". For
>>> >     > > > example, you mentioned about consumedOffsetsByPartition in
>>> the
>>> >     > > activeTasks.
>>> >     > > > We could add this as a JMX metric based on fetch positions
>>> >     inside the
>>> >     > > > consumer layer (note that Streams is just embedding
>>> consumers)
>>> >     or we
>>> >     > > could
>>> >     > > > consider adding it into TaskMetadata. Either case it can be
>>> >     visualized
>>> >     > > for
>>> >     > > > monitoring. The reason we expose StreamsMetadata as well as
>>> >     State was
>>> >     > > that
>>> >     > > > it is expected to be "polled" in a programmatic way for
>>> >     interactive
>>> >     > > queries
>>> >     > > > and also for control flows (e.g. I would like to ONLY start
>>> >     running my
>>> >     > > > other topology until the first topology has been up and
>>> >     running) while
>>> >     > > for
>>> >     > > > your case it seems the main purpose is to continuously query
>>> >     them for
>>> >     > > > monitoring etc. Personally I'd prefer to expose them as JMX
>>> >     only for
>>> >     > such
>>> >     > > > purposes only to have a simpler API.
>>> >     > > >
>>> >     > > > So given your current motivations I'd suggest expose the
>>> following
>>> >     > > > information as newly added metrics in Streams:
>>> >     > > >
>>> >     > > > 1. Thread-level state metric.
>>> >     > > > 2. Task-level hosted client identifier metric (e.g.
>>> host:port).
>>> >     > > > 3. Consumer-level per-topic/partition position metric (
>>> >     > > > https://kafka.apache.org/documentation/#topic_fetch_monitori
>>> ng
>>> >     <https://kafka.apache.org/documentation/#topic_fetch_monitoring>).
>>> >     > > >
>>> >     > > > Note that the task-level assignment information is static,
>>> >     i.e. it will
>>> >     > > not
>>> >     > > > change during the runtime at all and can be accessed from the
>>> >     > > `toString()`
>>> >     > > > function already even before the instance start running, so I
>>> >     think
>>> >     > this
>>> >     > > > piece of information do not need to be exposed through JMX
>>> >     anymore.
>>> >     > > >
>>> >     > > > WDYT?
>>> >     > > >
>>> >     > > > Guozhang
>>> >     > > >
>>> >     > > >
>>> >     > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
>>> >     <damian....@gmail.com <mailto:damian....@gmail.com>>
>>> >     > wrote:
>>> >     > > >
>>> >     > > > > Hi Florian,
>>> >     > > > >
>>> >     > > > > Thanks for the KIP.
>>> >     > > > >
>>> >     > > > > It seems there is some overlap here with what we already
>>> have in
>>> >     > > > > KafkaStreams.allMetadata(). This currently returns a
>>> >     > > > > Collection<StreamsMetadata> where each StreamsMetadata
>>> >     instance holds
>>> >     > > the
>>> >     > > > > state stores and partition assignment for every instance
>>> of the
>>> >     > > > > KafkaStreams application. I'm wondering if that is good
>>> >     enough for
>>> >     > what
>>> >     > > > you
>>> >     > > > > are trying to achieve? If not could it be modified to
>>> >     include the per
>>> >     > > > > Thread assignment?
>>> >     > > > >
>>> >     > > > > Thanks,
>>> >     > > > > Damian
>>> >     > > > >
>>> >     > > > >
>>> >     > > > >
>>> >     > > > >
>>> >     > > > >
>>> >     > > > >
>>> >     > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <
>>> >     > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>
>>>
>>> >     > > > > wrote:
>>> >     > > > >
>>> >     > > > > > Hi Matthias,
>>> >     > > > > >
>>> >     > > > > > First, I will answer to your last question.
>>> >     > > > > >
>>> >     > > > > > The main reason to have both TaskState#assignment and
>>> >     > > > > > TaskState#consumedOffsetsByPartition is that tasks have
>>> no
>>> >     > consumed
>>> >     > > > > offsets
>>> >     > > > > > until at least one message is consumed for each partition
>>> >     even if
>>> >     > > > > previous
>>> >     > > > > > offsets exist for the consumer group.
>>> >     > > > > > So yes this methods are redundant as it only diverge at
>>> >     application
>>> >     > > > > > startup.
>>> >     > > > > >
>>> >     > > > > > About the use case, currently we are developping for a
>>> >     customer a
>>> >     > > > little
>>> >     > > > > > framework based on KafkaStreams which
>>> >     transform/denormalize data
>>> >     > > before
>>> >     > > > > > ingesting into hadoop.
>>> >     > > > > >
>>> >     > > > > > We have a cluster of workers (SpringBoot) which
>>> instantiate
>>> >     > KStreams
>>> >     > > > > > topologies dynamicaly based on dataflow configurations.
>>> >     > > > > > Each configuration describes a topic to consume and how
>>> to
>>> >     process
>>> >     > > > > messages
>>> >     > > > > > (this looks like NiFi processors API).
>>> >     > > > > >
>>> >     > > > > > Our architecture is inspired from KafkaConnect. We have
>>> >     topics for
>>> >     > > > > configs
>>> >     > > > > > and states which are consumed by each workers (actually
>>> we
>>> >     have
>>> >     > > reused
>>> >     > > > > some
>>> >     > > > > > internals classes to the connect API).
>>> >     > > > > >
>>> >     > > > > > Now, we would like to develop UIs to visualize topics and
>>> >     > partitions
>>> >     > > > > > consumed by our worker applications.
>>> >     > > > > >
>>> >     > > > > > Also, I think it would be nice to be able,  in the
>>> futur, to
>>> >     > develop
>>> >     > > > web
>>> >     > > > > > UIs similar to Spark but for KafkaStreams to visualize
>>> >     DAGs...so
>>> >     > > maybe
>>> >     > > > > this
>>> >     > > > > > KIP is just a first step.
>>> >     > > > > >
>>> >     > > > > > Thanks,
>>> >     > > > > >
>>> >     > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
>>> >     <matth...@confluent.io <mailto:matth...@confluent.io>
>>>
>>> >     > >:
>>> >     > > > > >
>>> >     > > > > > > Thanks for the KIP.
>>> >     > > > > > >
>>> >     > > > > > > I am wondering a little bit, why you need to expose
>>> this
>>> >     > > information.
>>> >     > > > > > > Can you describe some use cases?
>>> >     > > > > > >
>>> >     > > > > > > Would it be worth to unify this new API with
>>> >     KafkaStreams#state()
>>> >     > > to
>>> >     > > > > get
>>> >     > > > > > > the overall state of an application without the need to
>>> >     call two
>>> >     > > > > > > different methods? Not sure how this unified API might
>>> >     look like
>>> >     > > > > though.
>>> >     > > > > > >
>>> >     > > > > > >
>>> >     > > > > > > One minor comment about the API: TaskState#assignment
>>> >     seems to be
>>> >     > > > > > > redundant. It should be the same as
>>> >     > > > > > > TaskState#consumedOffsetsByPartition.keySet()
>>> >     > > > > > >
>>> >     > > > > > > Or do I miss something?
>>> >     > > > > > >
>>> >     > > > > > >
>>> >     > > > > > > -Matthias
>>> >     > > > > > >
>>> >     > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
>>> >     > > > > > > > Hi Eno,
>>> >     > > > > > > >
>>> >     > > > > > > > Yes, but the state() method only returns the global
>>> >     state of
>>> >     > the
>>> >     > > > > > > > KafkaStream application (ie: CREATED, RUNNING,
>>> >     REBALANCING,
>>> >     > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING).
>>> >     > > > > > > >
>>> >     > > > > > > > An alternative to this KIP would be to change this
>>> >     method to
>>> >     > > return
>>> >     > > > > > more
>>> >     > > > > > > > information instead of adding a new method.
>>> >     > > > > > > >
>>> >     > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <
>>> >     > eno.there...@gmail.com <mailto:eno.there...@gmail.com>
>>> >     > > >:
>>> >     > > > > > > >
>>> >     > > > > > > >> Thanks Florian,
>>> >     > > > > > > >>
>>> >     > > > > > > >> Have you had a chance to look at the new state
>>> methods in
>>> >     > > 0.10.2,
>>> >     > > > > > e.g.,
>>> >     > > > > > > >> KafkaStreams.state()?
>>> >     > > > > > > >>
>>> >     > > > > > > >> Thanks
>>> >     > > > > > > >> Eno
>>> >     > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <
>>> >     > > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
>>>
>>> >     > > > > >
>>> >     > > > > > > >> wrote:
>>> >     > > > > > > >>>
>>> >     > > > > > > >>> Hi all,
>>> >     > > > > > > >>>
>>> >     > > > > > > >>> I have just created KIP-130 to add a new method to
>>> the
>>> >     > > > KafkaStreams
>>> >     > > > > > API
>>> >     > > > > > > >> in
>>> >     > > > > > > >>> order to expose the states of threads and active
>>> tasks.
>>> >     > > > > > > >>>
>>> >     > > > > > > >>>
>>> >     https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> >     <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
>>> >     > > > > > > >> 130%3A+Expose+states+of+active
>>> +tasks+to+KafkaStreams+
>>> >     > public+API
>>> >     > > > > > > >>>
>>> >     > > > > > > >>>
>>> >     > > > > > > >>> Thanks,
>>> >     > > > > > > >>>
>>> >     > > > > > > >>> --
>>> >     > > > > > > >>> Florian HUSSONNOIS
>>> >     > > > > > > >>
>>> >     > > > > > > >>
>>> >     > > > > > > >
>>> >     > > > > > > >
>>> >     > > > > > >
>>> >     > > > > > >
>>> >     > > > > >
>>> >     > > > > >
>>> >     > > > > > --
>>> >     > > > > > Florian HUSSONNOIS
>>> >     > > > > >
>>> >     > > > >
>>> >     > > >
>>> >     > > >
>>> >     > > >
>>> >     > > > --
>>> >     > > > -- Guozhang
>>> >     > > >
>>> >     > >
>>> >     > >
>>> >     > >
>>> >     > > --
>>> >     > > Florian HUSSONNOIS
>>> >     > >
>>> >     >
>>> >     >
>>> >     >
>>> >     > --
>>> >     > -- Guozhang
>>> >     >
>>> >
>>> >
>>> >
>>> >     --
>>> >     Florian HUSSONNOIS
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>>
>>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> Florian HUSSONNOIS
>



-- 
Florian HUSSONNOIS

Reply via email to