Florian,

I am just wondering: if we keep .toString(), what should the
implementation look like?


-Matthias

On 4/19/17 2:42 PM, Florian Hussonnois wrote:
> Hi Matthias,
> 
> So sorry for the delay in replying to you. For now, I think we can keep
> KafkaStreams#toString() as it is.
> It's always preferable to have an implementation for toString method.
> 
> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
> 
>> Florian,
>>
>>>>> What about KafkaStreams#toString() method?
>>>>>
>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
>> this
>>>>> KIP, is gets obsolete.
>>
>> Any thoughts about this? For me, this is the last open point to discuss
>> (or what should be reflected in the KIP in case you agree) before I can
>> put my vote on the VOTE thread do did start already.
>>
>> -Matthias
>>
>>
>> On 4/11/17 12:18 AM, Damian Guy wrote:
>>> Hi Florian,
>>>
>>> Thanks for the updates. The KIP is looking good.
>>>
>>> Cheers,
>>> Damian
>>>
>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io>
>> wrote:
>>>
>>>> What about KafkaStreams#toString() method?
>>>>
>>>> I think, we want to deprecate it as with KIP-120 and the changes of this
>>>> KIP, is gets obsolete.
>>>>
>>>> If we do so, please update the KIP accordingly.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
>>>>> Thanks for updating the KIP!
>>>>>
>>>>> I think it's good as is -- I would not add anything more to
>> TaskMetadata.
>>>>>
>>>>> About subtopologies and tasks. We do have the concept of subtopologies
>>>>> already in KIP-120. It's only missing and ID that allow to link a
>>>>> subtopology to a task.
>>>>>
>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
>>>>> should be sufficient. We can simply document in the JavaDocs how
>>>>> Subtopology and TaskMetadata can be linked to each other.
>>>>>
>>>>> I did update KIP-120 accordingly.
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
>>>>>> Hi all,
>>>>>>
>>>>>> I've updated the KIP and the PR to reflect your suggestions.
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>> https://github.com/apache/kafka/pull/2612
>>>>>>
>>>>>> Also, I've exposed property StreamThread#state as a string through the
>>>>>> new class ThreadMetadata.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com
>>>>>> <mailto:fhussonn...@gmail.com>>:
>>>>>>
>>>>>>     Hi Guozhang, Matthias,
>>>>>>
>>>>>>     It's a great idea to add sub topologies descriptions. This would
>>>>>>     help developers to better understand topology concept.
>>>>>>
>>>>>>     I agree that is not really user-friendly to check if
>>>>>>     `StreamsMetadata#streamThreads` is not returning null.
>>>>>>
>>>>>>     The method name localThreadsMetadata looks good. In addition, it's
>>>>>>     more simple to build ThreadMetadata instances from the
>> `StreamTask`
>>>>>>     class than from `StreamPartitionAssignor` class.
>>>>>>
>>>>>>     I will work on modifications. As I understand, I have to add the
>>>>>>     property subTopologyId property to the TaskMetadata class - Am I
>>>> right ?
>>>>>>
>>>>>>     Thanks,
>>>>>>
>>>>>>     2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com
>>>>>>     <mailto:wangg...@gmail.com>>:
>>>>>>
>>>>>>         Re 1): this is a good point. May be we can move
>>>>>>         `StreamsMetadata#streamThreads` as
>>>>>>         `KafkaStreams#localThreadsMetadata`?
>>>>>>
>>>>>>         3): this is a minor suggestion about function name of
>>>>>>         `assignedPartitions`, to `topicPartitions` to be consistent
>> with
>>>>>>         `StreamsMetadata`?
>>>>>>
>>>>>>
>>>>>>         Guozhang
>>>>>>
>>>>>>         On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
>>>>>>         <matth...@confluent.io <mailto:matth...@confluent.io>> wrote:
>>>>>>
>>>>>>             Thanks for the progress on this KIP. I think we are on the
>>>>>>             right path!
>>>>>>
>>>>>>             Couple of comments/questions:
>>>>>>
>>>>>>             (1) Why do we not consider the "rejected alternative" to
>> add
>>>>>>             the method
>>>>>>             to KafkaStreams? The comment on #streamThreads() says:
>>>>>>
>>>>>>             "Note this method will return <code>null</code> if called
>> on
>>>>>>             {@link
>>>>>>             StreamsMetadata} which represent a remote application."
>>>>>>
>>>>>>             Thus, if we cannot get any remote metadata, it seems not
>>>>>>             straight
>>>>>>             forward to not add it to KafkaStreams directly -- this
>> would
>>>>>>             avoid
>>>>>>             invalid calls and `null` return value in the first place.
>>>>>>
>>>>>>             I like the idea about exposing sub-topologies.:
>>>>>>
>>>>>>             (2a) I would recommend to rename `topicsGroupId` to
>>>>>>             `subTopologyId` :)
>>>>>>
>>>>>>             (2b) We could add this to KIP-120 already. However, I
>> would
>>>>>>             not just
>>>>>>             link both via name, but leverage KIP-120 directly, and
>> add a
>>>>>>             "Subtopology" member to the TaskMetadata class.
>>>>>>
>>>>>>
>>>>>>             Overall, I like the distinction of KIP-120 only exposing
>>>>>>             "static"
>>>>>>             information that can be determined before the topology
>> get's
>>>>>>             started,
>>>>>>             while this KIP allow to access runtime information.
>>>>>>
>>>>>>
>>>>>>
>>>>>>             -Matthias
>>>>>>
>>>>>>
>>>>>>             On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>>>>>             > Thanks for the updated KIP, and sorry for the late
>>>> replies!
>>>>>>             >
>>>>>>             > I think a little bit more about KIP-130, and I feel that
>>>>>>             if we are going
>>>>>>             > to deprecate the `toString` function (it is not
>> explicitly
>>>>>>             said in the
>>>>>>             > KIP, so I'm not sure if you plan to still keep the
>>>>>>             > `KafkaStreams#toString` as is or are going to replace it
>>>>>>             with the
>>>>>>             > proposed APIs) with the proposed ones, it may be okay.
>>>> More
>>>>>>             > specifically, after both KIP-120 and KIP-130:
>>>>>>             >
>>>>>>             > 1. users can use `#describe` function to check the
>>>>>>             generated topology
>>>>>>             > before calling `KafkaStreams#start`, which is static
>>>>>>             information.
>>>>>>             > 2. users can use the `StreamsMetadata -> ThreadMetadata
>> ->
>>>>>>             TaskMetadata`
>>>>>>             > programmatically after called `KafkaStreams#start` to
>> get
>>>> the
>>>>>>             > dynamically changeable information.
>>>>>>             >
>>>>>>             > One thing I'm still not sure though, is that in
>>>>>>             `TaskMetadata` we only
>>>>>>             > have the TaskId and assigned partitions, whereas in
>>>>>>             > "TopologyDescription" introduced in KIP-120, it will
>>>>>>             simply describe the
>>>>>>             > whole topology possibly composed of multiple
>>>>>>             sub-topologies. So it is
>>>>>>             > hard for users to tell which sub-topology is executed
>>>>>>             under which task
>>>>>>             > on-the-fly.
>>>>>>             >
>>>>>>             > Hence I'm thinking if we can expose the
>> "sub-topology-id"
>>>>>>             (named as
>>>>>>             > topicsGroupId internally) in
>>>>>>             TopologyDescription#Subtopology, and then
>>>>>>             > from the task id which is essentially "sub-topology-id
>>>> DASH
>>>>>>             > partition-group-id" users can make the link, though it
>> is
>>>>>>             still not that
>>>>>>             > straight-forward.
>>>>>>             >
>>>>>>             > Thoughts?
>>>>>>             >
>>>>>>             > Guozhang
>>>>>>             >
>>>>>>             >
>>>>>>             >
>>>>>>             > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>>>>>>             > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
>>>>>>             <mailto:fhussonn...@gmail.com
>>>>>>             <mailto:fhussonn...@gmail.com>>> wrote:
>>>>>>             >
>>>>>>             >     Thanks Guozhang for pointing me to the KIP-120.
>>>>>>             >
>>>>>>             >     I've made some modifications to the KIP. I also
>>>> proposed a new PR
>>>>>>             >     (there is
>>>>>>             >     still some tests to make).
>>>>>>             >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>             <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>
>>>>>>             >     <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>             <
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>
>>>>>>             >
>>>>>>             >     Exposing consumed offsets through JMX is sufficient
>>>> for debugging
>>>>>>             >     purpose.
>>>>>>             >     But I think this could be part to another JIRA as
>>>> there is no impact to
>>>>>>             >     public API.
>>>>>>             >
>>>>>>             >     Thanks
>>>>>>             >
>>>>>>             >     2017-03-10 22:35 GMT+01:00 Guozhang Wang <
>>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>             >     <mailto:wangg...@gmail.com <mailto:
>> wangg...@gmail.com
>>>>>>> :
>>>>>>
>>>>>>             >
>>>>>>             >     > Hello Florian,
>>>>>>             >     >
>>>>>>             >     > As for programmatically discover monitoring data
>> by
>>>>>>             piping metrics
>>>>>>             >     into a
>>>>>>             >     > dedicated topic. I think you can actually use a
>>>>>>             >     KafkaMetricsReporter which
>>>>>>             >     > pipes the polled metric values into a pre-defined
>>>>>>             topic (note that
>>>>>>             >     in Kafka
>>>>>>             >     > the MetricsReporter is simply an interface and
>> users
>>>>>>             can build
>>>>>>             >     their own
>>>>>>             >     > impl in addition to the JMXReporter), for example
>> :
>>>>>>             >     >
>>>>>>             >     > https://github.com/krux/kafka-metrics-reporter
>>>>>>             <https://github.com/krux/kafka-metrics-reporter>
>>>>>>             >     <https://github.com/krux/kafka-metrics-reporter
>>>>>>             <https://github.com/krux/kafka-metrics-reporter>>
>>>>>>             >     >
>>>>>>             >     > As for the "static task-level assignment", what I
>>>>>>             meant is that
>>>>>>             >     the mapping
>>>>>>             >     > from source-topic-partitions -> tasks are static,
>>>>>>             via the
>>>>>>             >     > "PartitionGrouper", and a task won't switch from
>> an
>>>>>>             active task to a
>>>>>>             >     > standby task, it is actually that an active task
>>>>>>             could be
>>>>>>             >     migrated, as a
>>>>>>             >     > whole along with all its assigned partitions, to
>>>>>>             another thread /
>>>>>>             >     process
>>>>>>             >     > and a new standby task will be created on the host
>>>>>>             that this
>>>>>>             >     active task is
>>>>>>             >     > migrating from. So for the SAME task, its
>>>> taskMetadata.
>>>>>>             >     > assignedPartitions()
>>>>>>             >     > will always return you the same partitions.
>>>>>>             >     >
>>>>>>             >     > As for the `toString` function that what we have
>>>>>>             today, I feel it
>>>>>>             >     has some
>>>>>>             >     > correlations with KIP-120 so I'm trying to
>>>>>>             coordinate some
>>>>>>             >     discussions here
>>>>>>             >     > (cc'ing Matthias as the owner of KIP-120). My
>>>>>>             understand is that:
>>>>>>             >     >
>>>>>>             >     > 1. In KIP-120, the `toString` function of
>>>>>>             `KafkaStreams` will be
>>>>>>             >     removed
>>>>>>             >     > and instead the `Topology#describe` function will
>> be
>>>>>>             introduced
>>>>>>             >     for users
>>>>>>             >     > to debug the topology BEFORE start running their
>>>>>>             instance with the
>>>>>>             >     > topology. And hence the description won't contain
>>>>>>             any task
>>>>>>             >     information as
>>>>>>             >     > they are not formed yet.
>>>>>>             >     > 2. In KIP-130, we want to add the task-level
>>>>>>             information for
>>>>>>             >     monitoring
>>>>>>             >     > purposes, which is not static and can only be
>>>>>>             captured AFTER the
>>>>>>             >     instance
>>>>>>             >     > has started running. Again I'm wondering for
>> KIP-130
>>>>>>             alone if
>>>>>>             >     adding those
>>>>>>             >     > metrics mentioned in my previous email would
>> suffice
>>>>>>             even for the
>>>>>>             >     use case
>>>>>>             >     > that you have mentioned.
>>>>>>             >     >
>>>>>>             >     >
>>>>>>             >     > Guozhang
>>>>>>             >     >
>>>>>>             >     > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
>>>>>>             >     <fhussonn...@gmail.com <mailto:
>> fhussonn...@gmail.com>
>>>>>>             <mailto:fhussonn...@gmail.com <mailto:
>> fhussonn...@gmail.com
>>>>>>>
>>>>>>
>>>>>>             >     > wrote:
>>>>>>             >     >
>>>>>>             >     > > Hi Guozhang
>>>>>>             >     > >
>>>>>>             >     > > Thank you for your feedback. I've started to
>> look
>>>>>>             more deeply
>>>>>>             >     into the
>>>>>>             >     > > code. As you mention, it would be more clever to
>>>>>>             use the current
>>>>>>             >     > > StreamMetadata API to expose these information.
>>>>>>             >     > >
>>>>>>             >     > > I think exposing metrics through JMX is great
>> for
>>>>>>             building
>>>>>>             >     monitoring
>>>>>>             >     > > dashboards using some tools like jmxtrans and
>>>> grafana.
>>>>>>             >     > > But for our use case we would like to expose the
>>>>>>             states
>>>>>>             >     directely from
>>>>>>             >     > the
>>>>>>             >     > > application embedding the kstreams topologies.
>>>>>>             >     > > So we expect to be able to retrieve states in a
>>>>>>             programmatic way.
>>>>>>             >     > >
>>>>>>             >     > > For instance, we could imagin to produce those
>>>>>>             states into a
>>>>>>             >     dedicated
>>>>>>             >     > > topic. In that way a third application could
>>>>>>             automatically
>>>>>>             >     discover all
>>>>>>             >     > > kafka-streams applications which could be
>>>> monitored.
>>>>>>             >     > > In production environment, that can be clearly a
>>>>>>             solution to have a
>>>>>>             >     > > complete overview of a microservices
>> architecture
>>>>>>             based on Kafka
>>>>>>             >     Streams.
>>>>>>             >     > >
>>>>>>             >     > > The toString() method give a lots of information
>>>>>>             it can only be
>>>>>>             >     used for
>>>>>>             >     > > debugging purpose but not to build a topologies
>>>>>>             visualization
>>>>>>             >     tool. We
>>>>>>             >     > > could actually expose same details about the
>>>>>>             stream topology
>>>>>>             >     from the
>>>>>>             >     > > StreamMetadata API ? So the TaskMetadata class
>> you
>>>>>>             have
>>>>>>             >     suggested could
>>>>>>             >     > > contains similar information that ones return by
>>>>>>             the toString
>>>>>>             >     method from
>>>>>>             >     > > AbstractTask class ?
>>>>>>             >     > >
>>>>>>             >     > > I can update the KIP in that way.
>>>>>>             >     > >
>>>>>>             >     > > Finally,  I'm not sure to understand your last
>>>>>>             point :* "Note
>>>>>>             >     that the
>>>>>>             >     > > task-level assignment information is static,
>> i.e.
>>>>>>             it will not change
>>>>>>             >     > during
>>>>>>             >     > > the runtime" *
>>>>>>             >     > >
>>>>>>             >     > > Does that mean when a rebalance occurs new tasks
>>>>>>             are created for
>>>>>>             >     the new
>>>>>>             >     > > assignments and old ones just switch to a
>> standby
>>>>>>             state ?
>>>>>>             >     > >
>>>>>>             >     > > Thanks,
>>>>>>             >     > >
>>>>>>             >     > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang
>>>>>>             <wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>             >     <mailto:wangg...@gmail.com <mailto:
>> wangg...@gmail.com
>>>>>>> :
>>>>>>
>>>>>>             >     > >
>>>>>>             >     > > > Hello Florian,
>>>>>>             >     > > >
>>>>>>             >     > > > Thanks for the KIP and your detailed
>> explanation
>>>>>>             of your use
>>>>>>             >     case. I
>>>>>>             >     > > think
>>>>>>             >     > > > there are two dimensions to discuss on how to
>>>>>>             improve Streams'
>>>>>>             >     > > > debuggability (or more specifically state
>>>>>>             exposure for
>>>>>>             >     visualization).
>>>>>>             >     > > >
>>>>>>             >     > > > First question is "what information should we
>>>>>>             expose to the
>>>>>>             >     user". From
>>>>>>             >     > > > your KIP I saw generally three categories:
>>>>>>             >     > > >
>>>>>>             >     > > > 1. The state of the thread within a process,
>> as
>>>>>>             you mentioned
>>>>>>             >     currently
>>>>>>             >     > > we
>>>>>>             >     > > > only expose the state of the process but not
>> the
>>>>>>             finer grained
>>>>>>             >     > per-thread
>>>>>>             >     > > > state.
>>>>>>             >     > > > 2. The state of the task. Currently the most
>>>>>>             close API to this is
>>>>>>             >     > > > StreamsMetadata,
>>>>>>             >     > > > however it aggregates the tasks across all
>>>>>>             threads and only
>>>>>>             >     present the
>>>>>>             >     > > > aggregated set of the assigned partitions /
>>>>>>             state stores etc.
>>>>>>             >     We can
>>>>>>             >     > > > consider extending this method to have a new
>>>>>>             >     StreamsMetadata#tasks()
>>>>>>             >     > > which
>>>>>>             >     > > > returns a TaskMetadata with the similar
>> fields,
>>>>>>             and the
>>>>>>             >     > > > StreamsMetadata.stateStoreNames / etc would
>>>>>>             still be returning the
>>>>>>             >     > > > aggregated results but users can still "drill
>>>>>>             down" if they want.
>>>>>>             >     > > >
>>>>>>             >     > > > The second question is "how should we expose
>>>>>>             them to the
>>>>>>             >     user". For
>>>>>>             >     > > > example, you mentioned about
>>>>>>             consumedOffsetsByPartition in the
>>>>>>             >     > > activeTasks.
>>>>>>             >     > > > We could add this as a JMX metric based on
>> fetch
>>>>>>             positions
>>>>>>             >     inside the
>>>>>>             >     > > > consumer layer (note that Streams is just
>>>>>>             embedding consumers)
>>>>>>             >     or we
>>>>>>             >     > > could
>>>>>>             >     > > > consider adding it into TaskMetadata. Either
>>>>>>             case it can be
>>>>>>             >     visualized
>>>>>>             >     > > for
>>>>>>             >     > > > monitoring. The reason we expose
>> StreamsMetadata
>>>>>>             as well as
>>>>>>             >     State was
>>>>>>             >     > > that
>>>>>>             >     > > > it is expected to be "polled" in a
>> programmatic
>>>>>>             way for
>>>>>>             >     interactive
>>>>>>             >     > > queries
>>>>>>             >     > > > and also for control flows (e.g. I would like
>> to
>>>>>>             ONLY start
>>>>>>             >     running my
>>>>>>             >     > > > other topology until the first topology has
>> been
>>>>>>             up and
>>>>>>             >     running) while
>>>>>>             >     > > for
>>>>>>             >     > > > your case it seems the main purpose is to
>>>>>>             continuously query
>>>>>>             >     them for
>>>>>>             >     > > > monitoring etc. Personally I'd prefer to
>> expose
>>>>>>             them as JMX
>>>>>>             >     only for
>>>>>>             >     > such
>>>>>>             >     > > > purposes only to have a simpler API.
>>>>>>             >     > > >
>>>>>>             >     > > > So given your current motivations I'd suggest
>>>>>>             expose the following
>>>>>>             >     > > > information as newly added metrics in Streams:
>>>>>>             >     > > >
>>>>>>             >     > > > 1. Thread-level state metric.
>>>>>>             >     > > > 2. Task-level hosted client identifier metric
>>>>>>             (e.g. host:port).
>>>>>>             >     > > > 3. Consumer-level per-topic/partition position
>>>>>>             metric (
>>>>>>             >     > > >
>>>>>>
>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>             <
>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>
>>>>>>             >
>>>>>>              <
>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>             <
>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>).
>>>>>>             >     > > >
>>>>>>             >     > > > Note that the task-level assignment
>> information
>>>> is static,
>>>>>>             >     i.e. it will
>>>>>>             >     > > not
>>>>>>             >     > > > change during the runtime at all and can be
>>>> accessed from the
>>>>>>             >     > > `toString()`
>>>>>>             >     > > > function already even before the instance
>> start
>>>> running, so I
>>>>>>             >     think
>>>>>>             >     > this
>>>>>>             >     > > > piece of information do not need to be exposed
>>>> through JMX
>>>>>>             >     anymore.
>>>>>>             >     > > >
>>>>>>             >     > > > WDYT?
>>>>>>             >     > > >
>>>>>>             >     > > > Guozhang
>>>>>>             >     > > >
>>>>>>             >     > > >
>>>>>>             >     > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
>>>>>>             >     <damian....@gmail.com <mailto:damian....@gmail.com>
>>>>>>             <mailto:damian....@gmail.com <mailto:damian....@gmail.com
>>>>>>>
>>>>>>             >     > wrote:
>>>>>>             >     > > >
>>>>>>             >     > > > > Hi Florian,
>>>>>>             >     > > > >
>>>>>>             >     > > > > Thanks for the KIP.
>>>>>>             >     > > > >
>>>>>>             >     > > > > It seems there is some overlap here with
>> what
>>>> we already have in
>>>>>>             >     > > > > KafkaStreams.allMetadata(). This currently
>>>> returns a
>>>>>>             >     > > > > Collection<StreamsMetadata> where each
>>>> StreamsMetadata
>>>>>>             >     instance holds
>>>>>>             >     > > the
>>>>>>             >     > > > > state stores and partition assignment for
>>>> every instance of the
>>>>>>             >     > > > > KafkaStreams application. I'm wondering if
>>>> that is good
>>>>>>             >     enough for
>>>>>>             >     > what
>>>>>>             >     > > > you
>>>>>>             >     > > > > are trying to achieve? If not could it be
>>>> modified to
>>>>>>             >     include the per
>>>>>>             >     > > > > Thread assignment?
>>>>>>             >     > > > >
>>>>>>             >     > > > > Thanks,
>>>>>>             >     > > > > Damian
>>>>>>             >     > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > > > On Wed, 1 Mar 2017 at 22:49 Florian
>>>> Hussonnois <
>>>>>>             >     > fhussonn...@gmail.com <mailto:
>> fhussonn...@gmail.com
>>>>>
>>>>>>             <mailto:fhussonn...@gmail.com <mailto:
>> fhussonn...@gmail.com
>>>>>>>
>>>>>>
>>>>>>             >     > > > > wrote:
>>>>>>             >     > > > >
>>>>>>             >     > > > > > Hi Matthias,
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > First, I will answer to your last
>> question.
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > The main reason to have both
>>>>>>             TaskState#assignment and
>>>>>>             >     > > > > > TaskState#consumedOffsetsByPartition is
>> that
>>>>>>             tasks have no
>>>>>>             >     > consumed
>>>>>>             >     > > > > offsets
>>>>>>             >     > > > > > until at least one message is consumed for
>>>>>>             each partition
>>>>>>             >     even if
>>>>>>             >     > > > > previous
>>>>>>             >     > > > > > offsets exist for the consumer group.
>>>>>>             >     > > > > > So yes this methods are redundant as it
>> only
>>>>>>             diverge at
>>>>>>             >     application
>>>>>>             >     > > > > > startup.
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > About the use case, currently we are
>>>>>>             developping for a
>>>>>>             >     customer a
>>>>>>             >     > > > little
>>>>>>             >     > > > > > framework based on KafkaStreams which
>>>>>>             >     transform/denormalize data
>>>>>>             >     > > before
>>>>>>             >     > > > > > ingesting into hadoop.
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > We have a cluster of workers (SpringBoot)
>>>>>>             which instantiate
>>>>>>             >     > KStreams
>>>>>>             >     > > > > > topologies dynamicaly based on dataflow
>>>>>>             configurations.
>>>>>>             >     > > > > > Each configuration describes a topic to
>>>>>>             consume and how to
>>>>>>             >     process
>>>>>>             >     > > > > messages
>>>>>>             >     > > > > > (this looks like NiFi processors API).
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > Our architecture is inspired from
>>>>>>             KafkaConnect. We have
>>>>>>             >     topics for
>>>>>>             >     > > > > configs
>>>>>>             >     > > > > > and states which are consumed by each
>>>>>>             workers (actually we
>>>>>>             >     have
>>>>>>             >     > > reused
>>>>>>             >     > > > > some
>>>>>>             >     > > > > > internals classes to the connect API).
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > Now, we would like to develop UIs to
>>>>>>             visualize topics and
>>>>>>             >     > partitions
>>>>>>             >     > > > > > consumed by our worker applications.
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > Also, I think it would be nice to be able,
>>>>>>             in the futur, to
>>>>>>             >     > develop
>>>>>>             >     > > > web
>>>>>>             >     > > > > > UIs similar to Spark but for KafkaStreams
>> to
>>>>>>             visualize
>>>>>>             >     DAGs...so
>>>>>>             >     > > maybe
>>>>>>             >     > > > > this
>>>>>>             >     > > > > > KIP is just a first step.
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > Thanks,
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
>>>>>>             >     <matth...@confluent.io <mailto:
>> matth...@confluent.io>
>>>>>>             <mailto:matth...@confluent.io <mailto:
>> matth...@confluent.io
>>>>>>
>>>>>>
>>>>>>             >     > >:
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > > Thanks for the KIP.
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > I am wondering a little bit, why you
>> need
>>>>>>             to expose this
>>>>>>             >     > > information.
>>>>>>             >     > > > > > > Can you describe some use cases?
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > Would it be worth to unify this new API
>>>> with
>>>>>>             >     KafkaStreams#state()
>>>>>>             >     > > to
>>>>>>             >     > > > > get
>>>>>>             >     > > > > > > the overall state of an application
>>>>>>             without the need to
>>>>>>             >     call two
>>>>>>             >     > > > > > > different methods? Not sure how this
>>>>>>             unified API might
>>>>>>             >     look like
>>>>>>             >     > > > > though.
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > One minor comment about the API:
>>>>>>             TaskState#assignment
>>>>>>             >     seems to be
>>>>>>             >     > > > > > > redundant. It should be the same as
>>>>>>             >     > > > > > >
>>>> TaskState#consumedOffsetsByPartition.keySet()
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > Or do I miss something?
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > -Matthias
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois
>>>> wrote:
>>>>>>             >     > > > > > > > Hi Eno,
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > > > Yes, but the state() method only
>> returns
>>>>>>             the global
>>>>>>             >     state of
>>>>>>             >     > the
>>>>>>             >     > > > > > > > KafkaStream application (ie: CREATED,
>>>>>>             RUNNING,
>>>>>>             >     REBALANCING,
>>>>>>             >     > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING).
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > > > An alternative to this KIP would be to
>>>>>>             change this
>>>>>>             >     method to
>>>>>>             >     > > return
>>>>>>             >     > > > > > more
>>>>>>             >     > > > > > > > information instead of adding a new
>>>> method.
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno
>> Thereska
>>>> <
>>>>>>             >     > eno.there...@gmail.com
>>>>>>             <mailto:eno.there...@gmail.com>
>>>>>>             <mailto:eno.there...@gmail.com <mailto:
>>>> eno.there...@gmail.com>>
>>>>>>             >     > > >:
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > > >> Thanks Florian,
>>>>>>             >     > > > > > > >>
>>>>>>             >     > > > > > > >> Have you had a chance to look at the
>>>> new state methods in
>>>>>>             >     > > 0.10.2,
>>>>>>             >     > > > > > e.g.,
>>>>>>             >     > > > > > > >> KafkaStreams.state()?
>>>>>>             >     > > > > > > >>
>>>>>>             >     > > > > > > >> Thanks
>>>>>>             >     > > > > > > >> Eno
>>>>>>             >     > > > > > > >>> On 1 Mar 2017, at 11:54, Florian
>>>> Hussonnois <
>>>>>>             >     > > > fhussonn...@gmail.com
>>>>>>             <mailto:fhussonn...@gmail.com> <mailto:
>>>> fhussonn...@gmail.com
>>>>>>             <mailto:fhussonn...@gmail.com>>
>>>>>>
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > > >> wrote:
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>> Hi all,
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>> I have just created KIP-130 to add a
>>>>>>             new method to the
>>>>>>             >     > > > KafkaStreams
>>>>>>             >     > > > > > API
>>>>>>             >     > > > > > > >> in
>>>>>>             >     > > > > > > >>> order to expose the states of
>> threads
>>>>>>             and active tasks.
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>>
>>>>>>             >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>             <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
>>>>>>             >
>>>>>>              <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>             <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>>
>>>>>>             >     > > > > > > >>
>>>>>>             130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
>>>>>>             >     > public+API
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>> Thanks,
>>>>>>             >     > > > > > > >>>
>>>>>>             >     > > > > > > >>> --
>>>>>>             >     > > > > > > >>> Florian HUSSONNOIS
>>>>>>             >     > > > > > > >>
>>>>>>             >     > > > > > > >>
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > > >
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > > >
>>>>>>             >     > > > > >
>>>>>>             >     > > > > >
>>>>>>             >     > > > > > --
>>>>>>             >     > > > > > Florian HUSSONNOIS
>>>>>>             >     > > > > >
>>>>>>             >     > > > >
>>>>>>             >     > > >
>>>>>>             >     > > >
>>>>>>             >     > > >
>>>>>>             >     > > > --
>>>>>>             >     > > > -- Guozhang
>>>>>>             >     > > >
>>>>>>             >     > >
>>>>>>             >     > >
>>>>>>             >     > >
>>>>>>             >     > > --
>>>>>>             >     > > Florian HUSSONNOIS
>>>>>>             >     > >
>>>>>>             >     >
>>>>>>             >     >
>>>>>>             >     >
>>>>>>             >     > --
>>>>>>             >     > -- Guozhang
>>>>>>             >     >
>>>>>>             >
>>>>>>             >
>>>>>>             >
>>>>>>             >     --
>>>>>>             >     Florian HUSSONNOIS
>>>>>>             >
>>>>>>             >
>>>>>>             >
>>>>>>             >
>>>>>>             > --
>>>>>>             > -- Guozhang
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>         --
>>>>>>         -- Guozhang
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>     --
>>>>>>     Florian HUSSONNOIS
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Florian HUSSONNOIS
>>>>>
>>>>
>>>>
>>>
>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to