Thanks the updating the KIP Walker.

About, `timeCurrentIdlingStarted()`: should we return an `Optional`
instead of `-1` if a task is not idling.


As we allow to call `localThreadMetadata()` any time, could it be that
we report partial information during a rebalance? If yes, this should be
pointed out, because if one want to implement a health check this needs
to be taken into account.

-Matthias


On 2/27/21 11:32 AM, Walker Carlson wrote:
> Sure thing Boyang,
> 
> 1) it is in proposed changes. I expanded on it a bit more now.
> 2) done
> 3) and done :)
> 
> thanks for the suggestions,
> walker
> 
> On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
> 
>> Thanks Walker. Some minor comments:
>>
>> 1. Could you add a reference to localThreadMetadata method in the KIP?
>> 2. Could you make the code block as a java template, such that
>> TaskMetadata.java could be as the template title? Also it would be good to
>> add some meta comments about the newly added functions.
>> 3. Could you write more details about rejected alternatives? Just as why we
>> don't choose to expose as metrics, and how a new method on KStream is not
>> favorable. These would be valuable when we look back on our design
>> decisions.
>>
>> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wcarl...@confluent.io>
>> wrote:
>>
>>> I understand now. I think that is a valid concern but I think it is best
>>> solved but having an external service verify through streams. As this KIP
>>> is now just adding fields to TaskMetadata to be returned in the
>>> threadMetadata I am going to say that is out of scope.
>>>
>>> That seems to be the last concern. If there are no others I will put this
>>> up for a vote soon.
>>>
>>> walker
>>>
>>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero...@gmail.com
>>>
>>> wrote:
>>>
>>>> For the 3rd point, yes, what I'm proposing is an edge case. For
>> example,
>>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
>> logic
>>>> causing no one gets 1_1 assigned. Then the health check service will
>> only
>>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
>> paying
>>>> attention to 1_1. What I want to expose is a "logical global" view of
>> all
>>>> the tasks through the stream instance, since each instance gets the
>>>> assigned topology and should be able to infer all the exact tasks to be
>>> up
>>>> and running when the service is healthy.
>>>>
>>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarl...@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Thanks for the follow up Boyang and Guozhang,
>>>>>
>>>>> I have updated the kip to include these ideas.
>>>>>
>>>>> Guozhang, that is a good idea about using the TaskMetadata. We can
>> get
>>> it
>>>>> through the ThreadMetadata with a minor change to
>> `localThreadMetadata`
>>>> in
>>>>> kafkaStreams. This means that we will only need to update
>> TaskMetadata
>>>> and
>>>>> add no other APIs
>>>>>
>>>>> Boyang, since each TaskMetadata contains the TaskId and
>>> TopicPartitions I
>>>>> don't believe mapping either way will be a problem. Also I think we
>> can
>>>> do
>>>>> something like record the time the task started idling and when it
>>> stops
>>>>> idling we can override it to -1. I think that should clear up the
>> first
>>>> two
>>>>> points.
>>>>>
>>>>> As for your third point I am not sure I 100% understand. The
>>>> ThreadMetadata
>>>>> will contain a set of all task assigned to that thread. Any health
>>> check
>>>>> service will just need to query all clients and aggregate their
>>> responses
>>>>> to get a complete picture of all tasks correct?
>>>>>
>>>>> walker
>>>>>
>>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest we
>>>>>> consolidate on the existing `TaskMetadata` since we have already
>>>>>> accumulated a bunch of such classes, and its better to keep them
>>> small
>>>> as
>>>>>> public APIs. You can see
>>>>> https://issues.apache.org/jira/browse/KAFKA-12370
>>>>>> for a reference and a proposal.
>>>>>>
>>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
>>>> reluctanthero...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the updates Walker. Some replies and follow-up
>>> questions:
>>>>>>>
>>>>>>> 1. I agree one task could have multiple partitions, but when we
>>> hit a
>>>>>> delay
>>>>>>> in terms of offset progress, do we have a convenient way to
>> reverse
>>>>>> mapping
>>>>>>> TopicPartition to the problematic task? In production, I believe
>> it
>>>>> would
>>>>>>> be much quicker to identify the problem using task.id instead of
>>>> topic
>>>>>>> partition, especially when it points to an internal topic. I
>> think
>>>>> having
>>>>>>> the task id as part of the entry value seems useful, which means
>>>>> getting
>>>>>>> something like Map<TopicPartition, TaskProgress> where
>> TaskProgress
>>>>>>> contains both committed offsets & task id.
>>>>>>>
>>>>>>> 2. The task idling API was still confusing. I don't think we care
>>>> about
>>>>>> the
>>>>>>> exact state when making tasksIdling()query, instead we care more
>>>> about
>>>>>> how
>>>>>>> long one task has been in idle state since when you called, which
>>>>>> reflects
>>>>>>> whether it is a normal idling period. So I feel it might be
>> helpful
>>>> to
>>>>>>> track that time difference and report it in the TaskStatus
>> struct.
>>>>>>>
>>>>>>> 3. What I want to achieve to have some global mapping of either
>>>>>>> TopicPartition or TaskId was that it is not possible for a health
>>>> check
>>>>>>> service to report a task failure that doesn't emit any metrics.
>> So
>>> as
>>>>>> long
>>>>>>> as we have a global topic partition API, health check could
>> always
>>> be
>>>>>> aware
>>>>>>> of any task/partition not reporting its progress, does that make
>>>> sense?
>>>>>> If
>>>>>>> you feel we have a better way to achieve this, such as querying
>> all
>>>> the
>>>>>>> input/intermediate topic metadata directly from Kafka for the
>>>>> baseline, I
>>>>>>> think that should be good as well and worth mentioning it in the
>>> KIP.
>>>>>>>
>>>>>>> Also it seems that the KIP hasn't reflected what you proposed for
>>> the
>>>>>> task
>>>>>>> idling status.
>>>>>>>
>>>>>>> Best,
>>>>>>> Boyang
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
>>>> wcarl...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank you for the comments everyone!
>>>>>>>>
>>>>>>>> I think there are a few things I can clear up in general then I
>>>> will
>>>>>>>> specifically respond to each question.
>>>>>>>>
>>>>>>>> First, when I say "idling" I refer to task idling. Where the
>>> stream
>>>>> is
>>>>>>>> intentionally not making progress. (
>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an
>>> example).
>>>>> This
>>>>>>>> becomes relevant if a task is waiting on one partition with no
>>> data
>>>>> but
>>>>>>>> that is holding up a partition with data. That would cause one
>>> just
>>>>>>> looking
>>>>>>>> at the committed offset changes to believe the task has a
>> problem
>>>>> when
>>>>>> it
>>>>>>>> is working as intended.
>>>>>>>>
>>>>>>>> In light of this confusion. I plan to change tasksIdling() to
>>>>>>> `Map<TaskId,
>>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it
>> more
>>>>> clear
>>>>>>> what
>>>>>>>> is being exposed.
>>>>>>>>
>>>>>>>> TaskStatus would include: TopicPartions, TaskId,
>>> ProcessorTopology,
>>>>>>> Idling,
>>>>>>>> and State.
>>>>>>>>
>>>>>>>> Boyang:
>>>>>>>>
>>>>>>>> 2) I think that each task should report on whatever
>>> TopicPartitions
>>>>>> they
>>>>>>>> hold, this means a Topic Partition might get reported twice but
>>> the
>>>>>> user
>>>>>>>> can roll those up and use the larger one when looking at the
>>> whole
>>>>> app.
>>>>>>>>
>>>>>>>> 4) If the user collects the committed offsets across all the
>>>> running
>>>>>>>> clients there shouldn't be any tasks missing correct?
>>>>>>>>
>>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and
>>>>>> TopicPartitions I
>>>>>>>> think it is cleaner to report them separately.
>>>>>>>>
>>>>>>>> Guozhang:
>>>>>>>>
>>>>>>>> 1) Yes, that was my original plan but it made more sense to
>>> mirror
>>>>> how
>>>>>>> the
>>>>>>>> consumer exposes the committed offset.
>>>>>>>>
>>>>>>>> 3) That is a good point. I think that we should include
>> internal
>>>>> topics
>>>>>>> as
>>>>>>>> well. I think that if the topology were to evolve there should
>> be
>>>>> fair
>>>>>>>> warning anyways. Maybe you can clarify what would be limited by
>>>>>> exposing
>>>>>>>> the interior topics here? I thought a user could find them in
>>> other
>>>>>> ways.
>>>>>>>> If it is the name we could aynomise them before exposing them.
>>>>>>>>
>>>>>>>> Thank you all for your comments. If I did not respond directly
>> to
>>>> one
>>>>>> of
>>>>>>>> your questions I updated the kip to include the details it was
>>>>>>> requesting.
>>>>>>>> I didn't not include my proposed changes mentioned earlier as I
>>>> would
>>>>>>> like
>>>>>>>> to get some feedback about what to include in TaskStatus and in
>>>>>> general.
>>>>>>>>
>>>>>>>> best,
>>>>>>>> Walker
>>>>>>>>
>>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
>>> wangg...@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts:
>>>>>>>>>
>>>>>>>>> 1) Have you considered just relying on the
>>>> `KafkaStreams#metrics()`
>>>>>>> that
>>>>>>>>> includes embedded consumer metrics that have the committed
>>>> offsets
>>>>>>>>> instead of adding a new API? Not advocating that this is a
>>> better
>>>>>>>> approach
>>>>>>>>> but want to make sure we considered all options before we
>> come
>>> to
>>>>> the
>>>>>>>> "last
>>>>>>>>> resort" of adding new public interfaces.
>>>>>>>>>
>>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but
>>> the
>>>>>>> returned
>>>>>>>>> map is on partitions. I think we should make the javadoc and
>>> the
>>>>>> return
>>>>>>>>> types consistent, either tasks or topic partitions.
>>>>>>>>>
>>>>>>>>> 3) In addition, if for 2) above we ended up with topic
>>>> partitions,
>>>>>> then
>>>>>>>>> would they include only external source topics, or also
>>> including
>>>>>>>> internal
>>>>>>>>> repartition / changelog topics? I think including only
>> external
>>>>>> source
>>>>>>>>> topic partitions are not sufficient for your goal of tracking
>>>>>> progress,
>>>>>>>> but
>>>>>>>>> exposing internal topic names are also a big commitment here
>>> for
>>>>>> future
>>>>>>>>> topology evolution.
>>>>>>>>>
>>>>>>>>> 4)  For "tasksIdling", I'm wondering if we can make it more
>>>>> general,
>>>>>>> that
>>>>>>>>> the returned value is not just a boolean, but a TaskState
>> that
>>>> can
>>>>> be
>>>>>>> an
>>>>>>>>> enum of "created, restoring, running, idle, closing". This
>>> could
>>>>> help
>>>>>>> us
>>>>>>>> in
>>>>>>>>> the future to track other things like restoration efficiency
>>> and
>>>>>>>> rebalance
>>>>>>>>> efficiency etc.
>>>>>>>>>
>>>>>>>>> 5) We need to clarify how is "idling" being defined here:
>> e.g.
>>> we
>>>>> can
>>>>>>>>> clearly state that a task is considered idle only if 1) lag
>> is
>>>>>>>>> increasing, indicating that there are indeed new records
>>> arrived
>>>> at
>>>>>>>> source,
>>>>>>>>> while committed offset is not advancing, AND 2) produced
>> offset
>>>>>>> (imagine
>>>>>>>> we
>>>>>>>>> may have punctuations that generate new data to the output
>>> topic
>>>>> even
>>>>>>> if
>>>>>>>>> there's no input for a while) is not advancing either.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
>>>>>>> reluctanthero...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely
>>>>> empower
>>>>>>>>> KStream
>>>>>>>>>> users with better visibility.
>>>>>>>>>>
>>>>>>>>>> Meanwhile I got a couple of questions/suggestions:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. typo "repost/report" in the motivation section.
>>>>>>>>>>
>>>>>>>>>> 2. What offsets do we report when the task is under
>>> restoration
>>>>> or
>>>>>>>>>> rebalancing?
>>>>>>>>>>
>>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics
>>> are
>>>>>> based
>>>>>>>> off
>>>>>>>>>> locally assigned tasks for each instance.
>>>>>>>>>>
>>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks
>> that
>>>> are
>>>>>> not
>>>>>>>>> local
>>>>>>>>>> to the instance? Users would normally try to monitor all
>> the
>>>>>> possible
>>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether
>> we
>>>> have
>>>>>>> lost
>>>>>>>>>> tasks. My brainstorming was whether it makes sense for the
>>>> leader
>>>>>>>>> instance
>>>>>>>>>> to report the task progress as -1 for all “supposed to be
>>>>> running”
>>>>>>>> tasks,
>>>>>>>>>> so that on the metrics collector side it could catch any
>>>> missing
>>>>>>> tasks.
>>>>>>>>>>
>>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`.
>>> Why
>>>>> not
>>>>>>>>> report a
>>>>>>>>>> map/set for idling tasks just as what we did for committed
>>>>> offsets?
>>>>>>>>>>
>>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the
>> key
>>> in
>>>>> the
>>>>>>>>>> returned map?
>>>>>>>>>> 7. Could we include some details in where we got the commit
>>>>> offsets
>>>>>>> for
>>>>>>>>>> each task? Is it through consumer offset fetch, or the
>> stream
>>>>>>>> processing
>>>>>>>>>> progress based on the records fetched?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
>>>>>>> wcarl...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all,
>>>>>>>>>>>
>>>>>>>>>>> I would like to start discussion on KIP-715. This kip
>> aims
>>> to
>>>>>> make
>>>>>>> it
>>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the
>>>>>> committed
>>>>>>>>> offset
>>>>>>>>>>> in a similar way as the consumer client does.
>>>>>>>>>>>
>>>>>>>>>>> Here is the KIP:
>>>> https://cwiki.apache.org/confluence/x/aRRRCg
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Walker
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Reply via email to