Thanks for the updated KIP, and sorry for the late replies!

I think a little bit more about KIP-130, and I feel that if we are going to
deprecate the `toString` function (it is not explicitly said in the KIP, so
I'm not sure if you plan to still keep the `KafkaStreams#toString` as is or
are going to replace it with the proposed APIs) with the proposed ones, it
may be okay. More specifically, after both KIP-120 and KIP-130:

1. users can use `#describe` function to check the generated topology
before calling `KafkaStreams#start`, which is static information.
2. users can use the `StreamsMetadata -> ThreadMetadata -> TaskMetadata`
programmatically after called `KafkaStreams#start` to get the dynamically
changeable information.

One thing I'm still not sure though, is that in `TaskMetadata` we only have
the TaskId and assigned partitions, whereas in "TopologyDescription"
introduced in KIP-120, it will simply describe the whole topology possibly
composed of multiple sub-topologies. So it is hard for users to tell which
sub-topology is executed under which task on-the-fly.

Hence I'm thinking if we can expose the "sub-topology-id" (named as
topicsGroupId internally) in TopologyDescription#Subtopology, and then from
the task id which is essentially "sub-topology-id DASH partition-group-id"
users can make the link, though it is still not that straight-forward.

Thoughts?

Guozhang



On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois <fhussonn...@gmail.com>
wrote:

> Thanks Guozhang for pointing me to the KIP-120.
>
> I've made some modifications to the KIP. I also proposed a new PR (there is
> still some tests to make).
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>
> Exposing consumed offsets through JMX is sufficient for debugging purpose.
> But I think this could be part to another JIRA as there is no impact to
> public API.
>
> Thanks
>
> 2017-03-10 22:35 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:
>
> > Hello Florian,
> >
> > As for programmatically discover monitoring data by piping metrics into a
> > dedicated topic. I think you can actually use a KafkaMetricsReporter
> which
> > pipes the polled metric values into a pre-defined topic (note that in
> Kafka
> > the MetricsReporter is simply an interface and users can build their own
> > impl in addition to the JMXReporter), for example :
> >
> > https://github.com/krux/kafka-metrics-reporter
> >
> > As for the "static task-level assignment", what I meant is that the
> mapping
> > from source-topic-partitions -> tasks are static, via the
> > "PartitionGrouper", and a task won't switch from an active task to a
> > standby task, it is actually that an active task could be migrated, as a
> > whole along with all its assigned partitions, to another thread / process
> > and a new standby task will be created on the host that this active task
> is
> > migrating from. So for the SAME task, its taskMetadata.
> > assignedPartitions()
> > will always return you the same partitions.
> >
> > As for the `toString` function that what we have today, I feel it has
> some
> > correlations with KIP-120 so I'm trying to coordinate some discussions
> here
> > (cc'ing Matthias as the owner of KIP-120). My understand is that:
> >
> > 1. In KIP-120, the `toString` function of `KafkaStreams` will be removed
> > and instead the `Topology#describe` function will be introduced for users
> > to debug the topology BEFORE start running their instance with the
> > topology. And hence the description won't contain any task information as
> > they are not formed yet.
> > 2. In KIP-130, we want to add the task-level information for monitoring
> > purposes, which is not static and can only be captured AFTER the instance
> > has started running. Again I'm wondering for KIP-130 alone if adding
> those
> > metrics mentioned in my previous email would suffice even for the use
> case
> > that you have mentioned.
> >
> >
> > Guozhang
> >
> > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang
> > >
> > > Thank you for your feedback. I've started to look more deeply into the
> > > code. As you mention, it would be more clever to use the current
> > > StreamMetadata API to expose these information.
> > >
> > > I think exposing metrics through JMX is great for building monitoring
> > > dashboards using some tools like jmxtrans and grafana.
> > > But for our use case we would like to expose the states directely from
> > the
> > > application embedding the kstreams topologies.
> > > So we expect to be able to retrieve states in a programmatic way.
> > >
> > > For instance, we could imagin to produce those states into a dedicated
> > > topic. In that way a third application could automatically discover all
> > > kafka-streams applications which could be monitored.
> > > In production environment, that can be clearly a solution to have a
> > > complete overview of a microservices architecture based on Kafka
> Streams.
> > >
> > > The toString() method give a lots of information it can only be used
> for
> > > debugging purpose but not to build a topologies visualization tool. We
> > > could actually expose same details about the stream topology from the
> > > StreamMetadata API ? So the TaskMetadata class you have suggested could
> > > contains similar information that ones return by the toString method
> from
> > > AbstractTask class ?
> > >
> > > I can update the KIP in that way.
> > >
> > > Finally,  I'm not sure to understand your last point :* "Note that the
> > > task-level assignment information is static, i.e. it will not change
> > during
> > > the runtime" *
> > >
> > > Does that mean when a rebalance occurs new tasks are created for the
> new
> > > assignments and old ones just switch to a standby state ?
> > >
> > > Thanks,
> > >
> > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:
> > >
> > > > Hello Florian,
> > > >
> > > > Thanks for the KIP and your detailed explanation of your use case. I
> > > think
> > > > there are two dimensions to discuss on how to improve Streams'
> > > > debuggability (or more specifically state exposure for
> visualization).
> > > >
> > > > First question is "what information should we expose to the user".
> From
> > > > your KIP I saw generally three categories:
> > > >
> > > > 1. The state of the thread within a process, as you mentioned
> currently
> > > we
> > > > only expose the state of the process but not the finer grained
> > per-thread
> > > > state.
> > > > 2. The state of the task. Currently the most close API to this is
> > > > StreamsMetadata,
> > > > however it aggregates the tasks across all threads and only present
> the
> > > > aggregated set of the assigned partitions / state stores etc. We can
> > > > consider extending this method to have a new StreamsMetadata#tasks()
> > > which
> > > > returns a TaskMetadata with the similar fields, and the
> > > > StreamsMetadata.stateStoreNames / etc would still be returning the
> > > > aggregated results but users can still "drill down" if they want.
> > > >
> > > > The second question is "how should we expose them to the user". For
> > > > example, you mentioned about consumedOffsetsByPartition in the
> > > activeTasks.
> > > > We could add this as a JMX metric based on fetch positions inside the
> > > > consumer layer (note that Streams is just embedding consumers) or we
> > > could
> > > > consider adding it into TaskMetadata. Either case it can be
> visualized
> > > for
> > > > monitoring. The reason we expose StreamsMetadata as well as State was
> > > that
> > > > it is expected to be "polled" in a programmatic way for interactive
> > > queries
> > > > and also for control flows (e.g. I would like to ONLY start running
> my
> > > > other topology until the first topology has been up and running)
> while
> > > for
> > > > your case it seems the main purpose is to continuously query them for
> > > > monitoring etc. Personally I'd prefer to expose them as JMX only for
> > such
> > > > purposes only to have a simpler API.
> > > >
> > > > So given your current motivations I'd suggest expose the following
> > > > information as newly added metrics in Streams:
> > > >
> > > > 1. Thread-level state metric.
> > > > 2. Task-level hosted client identifier metric (e.g. host:port).
> > > > 3. Consumer-level per-topic/partition position metric (
> > > > https://kafka.apache.org/documentation/#topic_fetch_monitoring).
> > > >
> > > > Note that the task-level assignment information is static, i.e. it
> will
> > > not
> > > > change during the runtime at all and can be accessed from the
> > > `toString()`
> > > > function already even before the instance start running, so I think
> > this
> > > > piece of information do not need to be exposed through JMX anymore.
> > > >
> > > > WDYT?
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com>
> > wrote:
> > > >
> > > > > Hi Florian,
> > > > >
> > > > > Thanks for the KIP.
> > > > >
> > > > > It seems there is some overlap here with what we already have in
> > > > > KafkaStreams.allMetadata(). This currently returns a
> > > > > Collection<StreamsMetadata> where each StreamsMetadata instance
> holds
> > > the
> > > > > state stores and partition assignment for every instance of the
> > > > > KafkaStreams application. I'm wondering if that is good enough for
> > what
> > > > you
> > > > > are trying to achieve? If not could it be modified to include the
> per
> > > > > Thread assignment?
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <
> > fhussonn...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Matthias,
> > > > > >
> > > > > > First, I will answer to your last question.
> > > > > >
> > > > > > The main reason to have both TaskState#assignment and
> > > > > > TaskState#consumedOffsetsByPartition is that tasks have no
> > consumed
> > > > > offsets
> > > > > > until at least one message is consumed for each partition even if
> > > > > previous
> > > > > > offsets exist for the consumer group.
> > > > > > So yes this methods are redundant as it only diverge at
> application
> > > > > > startup.
> > > > > >
> > > > > > About the use case, currently we are developping for a customer a
> > > > little
> > > > > > framework based on KafkaStreams which transform/denormalize data
> > > before
> > > > > > ingesting into hadoop.
> > > > > >
> > > > > > We have a cluster of workers (SpringBoot) which instantiate
> > KStreams
> > > > > > topologies dynamicaly based on dataflow configurations.
> > > > > > Each configuration describes a topic to consume and how to
> process
> > > > > messages
> > > > > > (this looks like NiFi processors API).
> > > > > >
> > > > > > Our architecture is inspired from KafkaConnect. We have topics
> for
> > > > > configs
> > > > > > and states which are consumed by each workers (actually we have
> > > reused
> > > > > some
> > > > > > internals classes to the connect API).
> > > > > >
> > > > > > Now, we would like to develop UIs to visualize topics and
> > partitions
> > > > > > consumed by our worker applications.
> > > > > >
> > > > > > Also, I think it would be nice to be able,  in the futur, to
> > develop
> > > > web
> > > > > > UIs similar to Spark but for KafkaStreams to visualize DAGs...so
> > > maybe
> > > > > this
> > > > > > KIP is just a first step.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <
> matth...@confluent.io
> > >:
> > > > > >
> > > > > > > Thanks for the KIP.
> > > > > > >
> > > > > > > I am wondering a little bit, why you need to expose this
> > > information.
> > > > > > > Can you describe some use cases?
> > > > > > >
> > > > > > > Would it be worth to unify this new API with
> KafkaStreams#state()
> > > to
> > > > > get
> > > > > > > the overall state of an application without the need to call
> two
> > > > > > > different methods? Not sure how this unified API might look
> like
> > > > > though.
> > > > > > >
> > > > > > >
> > > > > > > One minor comment about the API: TaskState#assignment seems to
> be
> > > > > > > redundant. It should be the same as
> > > > > > > TaskState#consumedOffsetsByPartition.keySet()
> > > > > > >
> > > > > > > Or do I miss something?
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
> > > > > > > > Hi Eno,
> > > > > > > >
> > > > > > > > Yes, but the state() method only returns the global state of
> > the
> > > > > > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING,
> > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING).
> > > > > > > >
> > > > > > > > An alternative to this KIP would be to change this method to
> > > return
> > > > > > more
> > > > > > > > information instead of adding a new method.
> > > > > > > >
> > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <
> > eno.there...@gmail.com
> > > >:
> > > > > > > >
> > > > > > > >> Thanks Florian,
> > > > > > > >>
> > > > > > > >> Have you had a chance to look at the new state methods in
> > > 0.10.2,
> > > > > > e.g.,
> > > > > > > >> KafkaStreams.state()?
> > > > > > > >>
> > > > > > > >> Thanks
> > > > > > > >> Eno
> > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <
> > > > fhussonn...@gmail.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >>>
> > > > > > > >>> Hi all,
> > > > > > > >>>
> > > > > > > >>> I have just created KIP-130 to add a new method to the
> > > > KafkaStreams
> > > > > > API
> > > > > > > >> in
> > > > > > > >>> order to expose the states of threads and active tasks.
> > > > > > > >>>
> > > > > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> > > > > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
> > public+API
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>> Thanks,
> > > > > > > >>>
> > > > > > > >>> --
> > > > > > > >>> Florian HUSSONNOIS
> > > > > > > >>
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Florian HUSSONNOIS
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Florian HUSSONNOIS
>



-- 
-- Guozhang

Reply via email to