I updated to use Optional, good idea Mathias.

For the localThreadMetadata, it could already be called running a
rebalance. Also I mention that they return the highest value they had seen
so far for any tasks they have assigned to them. I thought it would be
useful to see the TaskMetadata while the Threads were shutting down. I
think that there shouldn't really be partial information. If you think this
should be clarified better let me know.

walker

On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Can you clarify your second question Matthias? If this is queried during
> a cooperative rebalance, it should return the tasks as usual. If the user
> is
> using eager rebalancing then this will not return any tasks, but the user
> should
> not rely on all tasks being returned at any given time to begin with since
> it's
> possible we are in between revoking and re-assigning a partition.
>
> What does "partial information" mean?
>
> (btw I agree that an Optional makes sense for timeCurrentIdlingStarted())
>
> On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:
>
> > Thanks the updating the KIP Walker.
> >
> > About, `timeCurrentIdlingStarted()`: should we return an `Optional`
> > instead of `-1` if a task is not idling.
> >
> >
> > As we allow to call `localThreadMetadata()` any time, could it be that
> > we report partial information during a rebalance? If yes, this should be
> > pointed out, because if one want to implement a health check this needs
> > to be taken into account.
> >
> > -Matthias
> >
> >
> > On 2/27/21 11:32 AM, Walker Carlson wrote:
> > > Sure thing Boyang,
> > >
> > > 1) it is in proposed changes. I expanded on it a bit more now.
> > > 2) done
> > > 3) and done :)
> > >
> > > thanks for the suggestions,
> > > walker
> > >
> > > On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > >> Thanks Walker. Some minor comments:
> > >>
> > >> 1. Could you add a reference to localThreadMetadata method in the KIP?
> > >> 2. Could you make the code block as a java template, such that
> > >> TaskMetadata.java could be as the template title? Also it would be
> good
> > to
> > >> add some meta comments about the newly added functions.
> > >> 3. Could you write more details about rejected alternatives? Just as
> > why we
> > >> don't choose to expose as metrics, and how a new method on KStream is
> > not
> > >> favorable. These would be valuable when we look back on our design
> > >> decisions.
> > >>
> > >> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
> wcarl...@confluent.io>
> > >> wrote:
> > >>
> > >>> I understand now. I think that is a valid concern but I think it is
> > best
> > >>> solved but having an external service verify through streams. As this
> > KIP
> > >>> is now just adding fields to TaskMetadata to be returned in the
> > >>> threadMetadata I am going to say that is out of scope.
> > >>>
> > >>> That seems to be the last concern. If there are no others I will put
> > this
> > >>> up for a vote soon.
> > >>>
> > >>> walker
> > >>>
> > >>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
> > reluctanthero...@gmail.com
> > >>>
> > >>> wrote:
> > >>>
> > >>>> For the 3rd point, yes, what I'm proposing is an edge case. For
> > >> example,
> > >>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> > >> logic
> > >>>> causing no one gets 1_1 assigned. Then the health check service will
> > >> only
> > >>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> > >> paying
> > >>>> attention to 1_1. What I want to expose is a "logical global" view
> of
> > >> all
> > >>>> the tasks through the stream instance, since each instance gets the
> > >>>> assigned topology and should be able to infer all the exact tasks to
> > be
> > >>> up
> > >>>> and running when the service is healthy.
> > >>>>
> > >>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
> > wcarl...@confluent.io
> > >>>
> > >>>> wrote:
> > >>>>
> > >>>>> Thanks for the follow up Boyang and Guozhang,
> > >>>>>
> > >>>>> I have updated the kip to include these ideas.
> > >>>>>
> > >>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
> > >> get
> > >>> it
> > >>>>> through the ThreadMetadata with a minor change to
> > >> `localThreadMetadata`
> > >>>> in
> > >>>>> kafkaStreams. This means that we will only need to update
> > >> TaskMetadata
> > >>>> and
> > >>>>> add no other APIs
> > >>>>>
> > >>>>> Boyang, since each TaskMetadata contains the TaskId and
> > >>> TopicPartitions I
> > >>>>> don't believe mapping either way will be a problem. Also I think we
> > >> can
> > >>>> do
> > >>>>> something like record the time the task started idling and when it
> > >>> stops
> > >>>>> idling we can override it to -1. I think that should clear up the
> > >> first
> > >>>> two
> > >>>>> points.
> > >>>>>
> > >>>>> As for your third point I am not sure I 100% understand. The
> > >>>> ThreadMetadata
> > >>>>> will contain a set of all task assigned to that thread. Any health
> > >>> check
> > >>>>> service will just need to query all clients and aggregate their
> > >>> responses
> > >>>>> to get a complete picture of all tasks correct?
> > >>>>>
> > >>>>> walker
> > >>>>>
> > >>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com>
> > >>>> wrote:
> > >>>>>
> > >>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest
> we
> > >>>>>> consolidate on the existing `TaskMetadata` since we have already
> > >>>>>> accumulated a bunch of such classes, and its better to keep them
> > >>> small
> > >>>> as
> > >>>>>> public APIs. You can see
> > >>>>> https://issues.apache.org/jira/browse/KAFKA-12370
> > >>>>>> for a reference and a proposal.
> > >>>>>>
> > >>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > >>>> reluctanthero...@gmail.com>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Thanks for the updates Walker. Some replies and follow-up
> > >>> questions:
> > >>>>>>>
> > >>>>>>> 1. I agree one task could have multiple partitions, but when we
> > >>> hit a
> > >>>>>> delay
> > >>>>>>> in terms of offset progress, do we have a convenient way to
> > >> reverse
> > >>>>>> mapping
> > >>>>>>> TopicPartition to the problematic task? In production, I believe
> > >> it
> > >>>>> would
> > >>>>>>> be much quicker to identify the problem using task.id instead of
> > >>>> topic
> > >>>>>>> partition, especially when it points to an internal topic. I
> > >> think
> > >>>>> having
> > >>>>>>> the task id as part of the entry value seems useful, which means
> > >>>>> getting
> > >>>>>>> something like Map<TopicPartition, TaskProgress> where
> > >> TaskProgress
> > >>>>>>> contains both committed offsets & task id.
> > >>>>>>>
> > >>>>>>> 2. The task idling API was still confusing. I don't think we care
> > >>>> about
> > >>>>>> the
> > >>>>>>> exact state when making tasksIdling()query, instead we care more
> > >>>> about
> > >>>>>> how
> > >>>>>>> long one task has been in idle state since when you called, which
> > >>>>>> reflects
> > >>>>>>> whether it is a normal idling period. So I feel it might be
> > >> helpful
> > >>>> to
> > >>>>>>> track that time difference and report it in the TaskStatus
> > >> struct.
> > >>>>>>>
> > >>>>>>> 3. What I want to achieve to have some global mapping of either
> > >>>>>>> TopicPartition or TaskId was that it is not possible for a health
> > >>>> check
> > >>>>>>> service to report a task failure that doesn't emit any metrics.
> > >> So
> > >>> as
> > >>>>>> long
> > >>>>>>> as we have a global topic partition API, health check could
> > >> always
> > >>> be
> > >>>>>> aware
> > >>>>>>> of any task/partition not reporting its progress, does that make
> > >>>> sense?
> > >>>>>> If
> > >>>>>>> you feel we have a better way to achieve this, such as querying
> > >> all
> > >>>> the
> > >>>>>>> input/intermediate topic metadata directly from Kafka for the
> > >>>>> baseline, I
> > >>>>>>> think that should be good as well and worth mentioning it in the
> > >>> KIP.
> > >>>>>>>
> > >>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
> > >>> the
> > >>>>>> task
> > >>>>>>> idling status.
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Boyang
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > >>>> wcarl...@confluent.io>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>> Thank you for the comments everyone!
> > >>>>>>>>
> > >>>>>>>> I think there are a few things I can clear up in general then I
> > >>>> will
> > >>>>>>>> specifically respond to each question.
> > >>>>>>>>
> > >>>>>>>> First, when I say "idling" I refer to task idling. Where the
> > >>> stream
> > >>>>> is
> > >>>>>>>> intentionally not making progress. (
> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
> > >>> example).
> > >>>>> This
> > >>>>>>>> becomes relevant if a task is waiting on one partition with no
> > >>> data
> > >>>>> but
> > >>>>>>>> that is holding up a partition with data. That would cause one
> > >>> just
> > >>>>>>> looking
> > >>>>>>>> at the committed offset changes to believe the task has a
> > >> problem
> > >>>>> when
> > >>>>>> it
> > >>>>>>>> is working as intended.
> > >>>>>>>>
> > >>>>>>>> In light of this confusion. I plan to change tasksIdling() to
> > >>>>>>> `Map<TaskId,
> > >>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
> > >> more
> > >>>>> clear
> > >>>>>>> what
> > >>>>>>>> is being exposed.
> > >>>>>>>>
> > >>>>>>>> TaskStatus would include: TopicPartions, TaskId,
> > >>> ProcessorTopology,
> > >>>>>>> Idling,
> > >>>>>>>> and State.
> > >>>>>>>>
> > >>>>>>>> Boyang:
> > >>>>>>>>
> > >>>>>>>> 2) I think that each task should report on whatever
> > >>> TopicPartitions
> > >>>>>> they
> > >>>>>>>> hold, this means a Topic Partition might get reported twice but
> > >>> the
> > >>>>>> user
> > >>>>>>>> can roll those up and use the larger one when looking at the
> > >>> whole
> > >>>>> app.
> > >>>>>>>>
> > >>>>>>>> 4) If the user collects the committed offsets across all the
> > >>>> running
> > >>>>>>>> clients there shouldn't be any tasks missing correct?
> > >>>>>>>>
> > >>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
> > >>>>>> TopicPartitions I
> > >>>>>>>> think it is cleaner to report them separately.
> > >>>>>>>>
> > >>>>>>>> Guozhang:
> > >>>>>>>>
> > >>>>>>>> 1) Yes, that was my original plan but it made more sense to
> > >>> mirror
> > >>>>> how
> > >>>>>>> the
> > >>>>>>>> consumer exposes the committed offset.
> > >>>>>>>>
> > >>>>>>>> 3) That is a good point. I think that we should include
> > >> internal
> > >>>>> topics
> > >>>>>>> as
> > >>>>>>>> well. I think that if the topology were to evolve there should
> > >> be
> > >>>>> fair
> > >>>>>>>> warning anyways. Maybe you can clarify what would be limited by
> > >>>>>> exposing
> > >>>>>>>> the interior topics here? I thought a user could find them in
> > >>> other
> > >>>>>> ways.
> > >>>>>>>> If it is the name we could aynomise them before exposing them.
> > >>>>>>>>
> > >>>>>>>> Thank you all for your comments. If I did not respond directly
> > >> to
> > >>>> one
> > >>>>>> of
> > >>>>>>>> your questions I updated the kip to include the details it was
> > >>>>>>> requesting.
> > >>>>>>>> I didn't not include my proposed changes mentioned earlier as I
> > >>>> would
> > >>>>>>> like
> > >>>>>>>> to get some feedback about what to include in TaskStatus and in
> > >>>>>> general.
> > >>>>>>>>
> > >>>>>>>> best,
> > >>>>>>>> Walker
> > >>>>>>>>
> > >>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> > >>> wangg...@gmail.com
> > >>>>>
> > >>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
> > >>>>>>>>>
> > >>>>>>>>> 1) Have you considered just relying on the
> > >>>> `KafkaStreams#metrics()`
> > >>>>>>> that
> > >>>>>>>>> includes embedded consumer metrics that have the committed
> > >>>> offsets
> > >>>>>>>>> instead of adding a new API? Not advocating that this is a
> > >>> better
> > >>>>>>>> approach
> > >>>>>>>>> but want to make sure we considered all options before we
> > >> come
> > >>> to
> > >>>>> the
> > >>>>>>>> "last
> > >>>>>>>>> resort" of adding new public interfaces.
> > >>>>>>>>>
> > >>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
> > >>> the
> > >>>>>>> returned
> > >>>>>>>>> map is on partitions. I think we should make the javadoc and
> > >>> the
> > >>>>>> return
> > >>>>>>>>> types consistent, either tasks or topic partitions.
> > >>>>>>>>>
> > >>>>>>>>> 3) In addition, if for 2) above we ended up with topic
> > >>>> partitions,
> > >>>>>> then
> > >>>>>>>>> would they include only external source topics, or also
> > >>> including
> > >>>>>>>> internal
> > >>>>>>>>> repartition / changelog topics? I think including only
> > >> external
> > >>>>>> source
> > >>>>>>>>> topic partitions are not sufficient for your goal of tracking
> > >>>>>> progress,
> > >>>>>>>> but
> > >>>>>>>>> exposing internal topic names are also a big commitment here
> > >>> for
> > >>>>>> future
> > >>>>>>>>> topology evolution.
> > >>>>>>>>>
> > >>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
> > >>>>> general,
> > >>>>>>> that
> > >>>>>>>>> the returned value is not just a boolean, but a TaskState
> > >> that
> > >>>> can
> > >>>>> be
> > >>>>>>> an
> > >>>>>>>>> enum of "created, restoring, running, idle, closing". This
> > >>> could
> > >>>>> help
> > >>>>>>> us
> > >>>>>>>> in
> > >>>>>>>>> the future to track other things like restoration efficiency
> > >>> and
> > >>>>>>>> rebalance
> > >>>>>>>>> efficiency etc.
> > >>>>>>>>>
> > >>>>>>>>> 5) We need to clarify how is "idling" being defined here:
> > >> e.g.
> > >>> we
> > >>>>> can
> > >>>>>>>>> clearly state that a task is considered idle only if 1) lag
> > >> is
> > >>>>>>>>> increasing, indicating that there are indeed new records
> > >>> arrived
> > >>>> at
> > >>>>>>>> source,
> > >>>>>>>>> while committed offset is not advancing, AND 2) produced
> > >> offset
> > >>>>>>> (imagine
> > >>>>>>>> we
> > >>>>>>>>> may have punctuations that generate new data to the output
> > >>> topic
> > >>>>> even
> > >>>>>>> if
> > >>>>>>>>> there's no input for a while) is not advancing either.
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Guozhang
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > >>>>>>> reluctanthero...@gmail.com>
> > >>>>>>>>> wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
> > >>>>> empower
> > >>>>>>>>> KStream
> > >>>>>>>>>> users with better visibility.
> > >>>>>>>>>>
> > >>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 1. typo "repost/report" in the motivation section.
> > >>>>>>>>>>
> > >>>>>>>>>> 2. What offsets do we report when the task is under
> > >>> restoration
> > >>>>> or
> > >>>>>>>>>> rebalancing?
> > >>>>>>>>>>
> > >>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
> > >>> are
> > >>>>>> based
> > >>>>>>>> off
> > >>>>>>>>>> locally assigned tasks for each instance.
> > >>>>>>>>>>
> > >>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
> > >> that
> > >>>> are
> > >>>>>> not
> > >>>>>>>>> local
> > >>>>>>>>>> to the instance? Users would normally try to monitor all
> > >> the
> > >>>>>> possible
> > >>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
> > >> we
> > >>>> have
> > >>>>>>> lost
> > >>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
> > >>>> leader
> > >>>>>>>>> instance
> > >>>>>>>>>> to report the task progress as -1 for all “supposed to be
> > >>>>> running”
> > >>>>>>>> tasks,
> > >>>>>>>>>> so that on the metrics collector side it could catch any
> > >>>> missing
> > >>>>>>> tasks.
> > >>>>>>>>>>
> > >>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
> > >>> Why
> > >>>>> not
> > >>>>>>>>> report a
> > >>>>>>>>>> map/set for idling tasks just as what we did for committed
> > >>>>> offsets?
> > >>>>>>>>>>
> > >>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
> > >> key
> > >>> in
> > >>>>> the
> > >>>>>>>>>> returned map?
> > >>>>>>>>>> 7. Could we include some details in where we got the commit
> > >>>>> offsets
> > >>>>>>> for
> > >>>>>>>>>> each task? Is it through consumer offset fetch, or the
> > >> stream
> > >>>>>>>> processing
> > >>>>>>>>>> progress based on the records fetched?
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > >>>>>>> wcarl...@confluent.io>
> > >>>>>>>>>> wrote:
> > >>>>>>>>>>
> > >>>>>>>>>>> Hello all,
> > >>>>>>>>>>>
> > >>>>>>>>>>> I would like to start discussion on KIP-715. This kip
> > >> aims
> > >>> to
> > >>>>>> make
> > >>>>>>> it
> > >>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
> > >>>>>> committed
> > >>>>>>>>> offset
> > >>>>>>>>>>> in a similar way as the consumer client does.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Here is the KIP:
> > >>>> https://cwiki.apache.org/confluence/x/aRRRCg
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Walker
> > >>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>> -- Guozhang
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> --
> > >>>>>> -- Guozhang
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> > >
> >
>

Reply via email to