Hello Florian,

As for programmatically discover monitoring data by piping metrics into a
dedicated topic. I think you can actually use a KafkaMetricsReporter which
pipes the polled metric values into a pre-defined topic (note that in Kafka
the MetricsReporter is simply an interface and users can build their own
impl in addition to the JMXReporter), for example :

https://github.com/krux/kafka-metrics-reporter

As for the "static task-level assignment", what I meant is that the mapping
from source-topic-partitions -> tasks are static, via the
"PartitionGrouper", and a task won't switch from an active task to a
standby task, it is actually that an active task could be migrated, as a
whole along with all its assigned partitions, to another thread / process
and a new standby task will be created on the host that this active task is
migrating from. So for the SAME task, its taskMetadata.assignedPartitions()
will always return you the same partitions.

As for the `toString` function that what we have today, I feel it has some
correlations with KIP-120 so I'm trying to coordinate some discussions here
(cc'ing Matthias as the owner of KIP-120). My understand is that:

1. In KIP-120, the `toString` function of `KafkaStreams` will be removed
and instead the `Topology#describe` function will be introduced for users
to debug the topology BEFORE start running their instance with the
topology. And hence the description won't contain any task information as
they are not formed yet.
2. In KIP-130, we want to add the task-level information for monitoring
purposes, which is not static and can only be captured AFTER the instance
has started running. Again I'm wondering for KIP-130 alone if adding those
metrics mentioned in my previous email would suffice even for the use case
that you have mentioned.


Guozhang

On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois <fhussonn...@gmail.com>
wrote:

> Hi Guozhang
>
> Thank you for your feedback. I've started to look more deeply into the
> code. As you mention, it would be more clever to use the current
> StreamMetadata API to expose these information.
>
> I think exposing metrics through JMX is great for building monitoring
> dashboards using some tools like jmxtrans and grafana.
> But for our use case we would like to expose the states directely from the
> application embedding the kstreams topologies.
> So we expect to be able to retrieve states in a programmatic way.
>
> For instance, we could imagin to produce those states into a dedicated
> topic. In that way a third application could automatically discover all
> kafka-streams applications which could be monitored.
> In production environment, that can be clearly a solution to have a
> complete overview of a microservices architecture based on Kafka Streams.
>
> The toString() method give a lots of information it can only be used for
> debugging purpose but not to build a topologies visualization tool. We
> could actually expose same details about the stream topology from the
> StreamMetadata API ? So the TaskMetadata class you have suggested could
> contains similar information that ones return by the toString method from
> AbstractTask class ?
>
> I can update the KIP in that way.
>
> Finally,  I'm not sure to understand your last point :* "Note that the
> task-level assignment information is static, i.e. it will not change during
> the runtime" *
>
> Does that mean when a rebalance occurs new tasks are created for the new
> assignments and old ones just switch to a standby state ?
>
> Thanks,
>
> 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:
>
> > Hello Florian,
> >
> > Thanks for the KIP and your detailed explanation of your use case. I
> think
> > there are two dimensions to discuss on how to improve Streams'
> > debuggability (or more specifically state exposure for visualization).
> >
> > First question is "what information should we expose to the user". From
> > your KIP I saw generally three categories:
> >
> > 1. The state of the thread within a process, as you mentioned currently
> we
> > only expose the state of the process but not the finer grained per-thread
> > state.
> > 2. The state of the task. Currently the most close API to this is
> > StreamsMetadata,
> > however it aggregates the tasks across all threads and only present the
> > aggregated set of the assigned partitions / state stores etc. We can
> > consider extending this method to have a new StreamsMetadata#tasks()
> which
> > returns a TaskMetadata with the similar fields, and the
> > StreamsMetadata.stateStoreNames / etc would still be returning the
> > aggregated results but users can still "drill down" if they want.
> >
> > The second question is "how should we expose them to the user". For
> > example, you mentioned about consumedOffsetsByPartition in the
> activeTasks.
> > We could add this as a JMX metric based on fetch positions inside the
> > consumer layer (note that Streams is just embedding consumers) or we
> could
> > consider adding it into TaskMetadata. Either case it can be visualized
> for
> > monitoring. The reason we expose StreamsMetadata as well as State was
> that
> > it is expected to be "polled" in a programmatic way for interactive
> queries
> > and also for control flows (e.g. I would like to ONLY start running my
> > other topology until the first topology has been up and running) while
> for
> > your case it seems the main purpose is to continuously query them for
> > monitoring etc. Personally I'd prefer to expose them as JMX only for such
> > purposes only to have a simpler API.
> >
> > So given your current motivations I'd suggest expose the following
> > information as newly added metrics in Streams:
> >
> > 1. Thread-level state metric.
> > 2. Task-level hosted client identifier metric (e.g. host:port).
> > 3. Consumer-level per-topic/partition position metric (
> > https://kafka.apache.org/documentation/#topic_fetch_monitoring).
> >
> > Note that the task-level assignment information is static, i.e. it will
> not
> > change during the runtime at all and can be accessed from the
> `toString()`
> > function already even before the instance start running, so I think this
> > piece of information do not need to be exposed through JMX anymore.
> >
> > WDYT?
> >
> > Guozhang
> >
> >
> > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Florian,
> > >
> > > Thanks for the KIP.
> > >
> > > It seems there is some overlap here with what we already have in
> > > KafkaStreams.allMetadata(). This currently returns a
> > > Collection<StreamsMetadata> where each StreamsMetadata instance holds
> the
> > > state stores and partition assignment for every instance of the
> > > KafkaStreams application. I'm wondering if that is good enough for what
> > you
> > > are trying to achieve? If not could it be modified to include the per
> > > Thread assignment?
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <fhussonn...@gmail.com>
> > > wrote:
> > >
> > > > Hi Matthias,
> > > >
> > > > First, I will answer to your last question.
> > > >
> > > > The main reason to have both TaskState#assignment and
> > > > TaskState#consumedOffsetsByPartition is that tasks have no consumed
> > > offsets
> > > > until at least one message is consumed for each partition even if
> > > previous
> > > > offsets exist for the consumer group.
> > > > So yes this methods are redundant as it only diverge at application
> > > > startup.
> > > >
> > > > About the use case, currently we are developping for a customer a
> > little
> > > > framework based on KafkaStreams which transform/denormalize data
> before
> > > > ingesting into hadoop.
> > > >
> > > > We have a cluster of workers (SpringBoot) which instantiate KStreams
> > > > topologies dynamicaly based on dataflow configurations.
> > > > Each configuration describes a topic to consume and how to process
> > > messages
> > > > (this looks like NiFi processors API).
> > > >
> > > > Our architecture is inspired from KafkaConnect. We have topics for
> > > configs
> > > > and states which are consumed by each workers (actually we have
> reused
> > > some
> > > > internals classes to the connect API).
> > > >
> > > > Now, we would like to develop UIs to visualize topics and partitions
> > > > consumed by our worker applications.
> > > >
> > > > Also, I think it would be nice to be able,  in the futur, to develop
> > web
> > > > UIs similar to Spark but for KafkaStreams to visualize DAGs...so
> maybe
> > > this
> > > > KIP is just a first step.
> > > >
> > > > Thanks,
> > > >
> > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> > > >
> > > > > Thanks for the KIP.
> > > > >
> > > > > I am wondering a little bit, why you need to expose this
> information.
> > > > > Can you describe some use cases?
> > > > >
> > > > > Would it be worth to unify this new API with KafkaStreams#state()
> to
> > > get
> > > > > the overall state of an application without the need to call two
> > > > > different methods? Not sure how this unified API might look like
> > > though.
> > > > >
> > > > >
> > > > > One minor comment about the API: TaskState#assignment seems to be
> > > > > redundant. It should be the same as
> > > > > TaskState#consumedOffsetsByPartition.keySet()
> > > > >
> > > > > Or do I miss something?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
> > > > > > Hi Eno,
> > > > > >
> > > > > > Yes, but the state() method only returns the global state of the
> > > > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING,
> > > > > > PENDING_SHUTDOWN, NOT_RUNNING).
> > > > > >
> > > > > > An alternative to this KIP would be to change this method to
> return
> > > > more
> > > > > > information instead of adding a new method.
> > > > > >
> > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <eno.there...@gmail.com
> >:
> > > > > >
> > > > > >> Thanks Florian,
> > > > > >>
> > > > > >> Have you had a chance to look at the new state methods in
> 0.10.2,
> > > > e.g.,
> > > > > >> KafkaStreams.state()?
> > > > > >>
> > > > > >> Thanks
> > > > > >> Eno
> > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <
> > fhussonn...@gmail.com
> > > >
> > > > > >> wrote:
> > > > > >>>
> > > > > >>> Hi all,
> > > > > >>>
> > > > > >>> I have just created KIP-130 to add a new method to the
> > KafkaStreams
> > > > API
> > > > > >> in
> > > > > >>> order to expose the states of threads and active tasks.
> > > > > >>>
> > > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> > > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> > > > > >>>
> > > > > >>>
> > > > > >>> Thanks,
> > > > > >>>
> > > > > >>> --
> > > > > >>> Florian HUSSONNOIS
> > > > > >>
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Florian HUSSONNOIS
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Florian HUSSONNOIS
>



-- 
-- Guozhang

Reply via email to