Thanks the updating the KIP Walker. About, `timeCurrentIdlingStarted()`: should we return an `Optional` instead of `-1` if a task is not idling.
As we allow to call `localThreadMetadata()` any time, could it be that we report partial information during a rebalance? If yes, this should be pointed out, because if one want to implement a health check this needs to be taken into account. -Matthias On 2/27/21 11:32 AM, Walker Carlson wrote: > Sure thing Boyang, > > 1) it is in proposed changes. I expanded on it a bit more now. > 2) done > 3) and done :) > > thanks for the suggestions, > walker > > On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > >> Thanks Walker. Some minor comments: >> >> 1. Could you add a reference to localThreadMetadata method in the KIP? >> 2. Could you make the code block as a java template, such that >> TaskMetadata.java could be as the template title? Also it would be good to >> add some meta comments about the newly added functions. >> 3. Could you write more details about rejected alternatives? Just as why we >> don't choose to expose as metrics, and how a new method on KStream is not >> favorable. These would be valuable when we look back on our design >> decisions. >> >> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wcarl...@confluent.io> >> wrote: >> >>> I understand now. I think that is a valid concern but I think it is best >>> solved but having an external service verify through streams. As this KIP >>> is now just adding fields to TaskMetadata to be returned in the >>> threadMetadata I am going to say that is out of scope. >>> >>> That seems to be the last concern. If there are no others I will put this >>> up for a vote soon. >>> >>> walker >>> >>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero...@gmail.com >>> >>> wrote: >>> >>>> For the 3rd point, yes, what I'm proposing is an edge case. For >> example, >>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing >> logic >>>> causing no one gets 1_1 assigned. Then the health check service will >> only >>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not >> paying >>>> attention to 1_1. What I want to expose is a "logical global" view of >> all >>>> the tasks through the stream instance, since each instance gets the >>>> assigned topology and should be able to infer all the exact tasks to be >>> up >>>> and running when the service is healthy. >>>> >>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarl...@confluent.io >>> >>>> wrote: >>>> >>>>> Thanks for the follow up Boyang and Guozhang, >>>>> >>>>> I have updated the kip to include these ideas. >>>>> >>>>> Guozhang, that is a good idea about using the TaskMetadata. We can >> get >>> it >>>>> through the ThreadMetadata with a minor change to >> `localThreadMetadata` >>>> in >>>>> kafkaStreams. This means that we will only need to update >> TaskMetadata >>>> and >>>>> add no other APIs >>>>> >>>>> Boyang, since each TaskMetadata contains the TaskId and >>> TopicPartitions I >>>>> don't believe mapping either way will be a problem. Also I think we >> can >>>> do >>>>> something like record the time the task started idling and when it >>> stops >>>>> idling we can override it to -1. I think that should clear up the >> first >>>> two >>>>> points. >>>>> >>>>> As for your third point I am not sure I 100% understand. The >>>> ThreadMetadata >>>>> will contain a set of all task assigned to that thread. Any health >>> check >>>>> service will just need to query all clients and aggregate their >>> responses >>>>> to get a complete picture of all tasks correct? >>>>> >>>>> walker >>>>> >>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest we >>>>>> consolidate on the existing `TaskMetadata` since we have already >>>>>> accumulated a bunch of such classes, and its better to keep them >>> small >>>> as >>>>>> public APIs. You can see >>>>> https://issues.apache.org/jira/browse/KAFKA-12370 >>>>>> for a reference and a proposal. >>>>>> >>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen < >>>> reluctanthero...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the updates Walker. Some replies and follow-up >>> questions: >>>>>>> >>>>>>> 1. I agree one task could have multiple partitions, but when we >>> hit a >>>>>> delay >>>>>>> in terms of offset progress, do we have a convenient way to >> reverse >>>>>> mapping >>>>>>> TopicPartition to the problematic task? In production, I believe >> it >>>>> would >>>>>>> be much quicker to identify the problem using task.id instead of >>>> topic >>>>>>> partition, especially when it points to an internal topic. I >> think >>>>> having >>>>>>> the task id as part of the entry value seems useful, which means >>>>> getting >>>>>>> something like Map<TopicPartition, TaskProgress> where >> TaskProgress >>>>>>> contains both committed offsets & task id. >>>>>>> >>>>>>> 2. The task idling API was still confusing. I don't think we care >>>> about >>>>>> the >>>>>>> exact state when making tasksIdling()query, instead we care more >>>> about >>>>>> how >>>>>>> long one task has been in idle state since when you called, which >>>>>> reflects >>>>>>> whether it is a normal idling period. So I feel it might be >> helpful >>>> to >>>>>>> track that time difference and report it in the TaskStatus >> struct. >>>>>>> >>>>>>> 3. What I want to achieve to have some global mapping of either >>>>>>> TopicPartition or TaskId was that it is not possible for a health >>>> check >>>>>>> service to report a task failure that doesn't emit any metrics. >> So >>> as >>>>>> long >>>>>>> as we have a global topic partition API, health check could >> always >>> be >>>>>> aware >>>>>>> of any task/partition not reporting its progress, does that make >>>> sense? >>>>>> If >>>>>>> you feel we have a better way to achieve this, such as querying >> all >>>> the >>>>>>> input/intermediate topic metadata directly from Kafka for the >>>>> baseline, I >>>>>>> think that should be good as well and worth mentioning it in the >>> KIP. >>>>>>> >>>>>>> Also it seems that the KIP hasn't reflected what you proposed for >>> the >>>>>> task >>>>>>> idling status. >>>>>>> >>>>>>> Best, >>>>>>> Boyang >>>>>>> >>>>>>> >>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson < >>>> wcarl...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Thank you for the comments everyone! >>>>>>>> >>>>>>>> I think there are a few things I can clear up in general then I >>>> will >>>>>>>> specifically respond to each question. >>>>>>>> >>>>>>>> First, when I say "idling" I refer to task idling. Where the >>> stream >>>>> is >>>>>>>> intentionally not making progress. ( >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an >>> example). >>>>> This >>>>>>>> becomes relevant if a task is waiting on one partition with no >>> data >>>>> but >>>>>>>> that is holding up a partition with data. That would cause one >>> just >>>>>>> looking >>>>>>>> at the committed offset changes to believe the task has a >> problem >>>>> when >>>>>> it >>>>>>>> is working as intended. >>>>>>>> >>>>>>>> In light of this confusion. I plan to change tasksIdling() to >>>>>>> `Map<TaskId, >>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it >> more >>>>> clear >>>>>>> what >>>>>>>> is being exposed. >>>>>>>> >>>>>>>> TaskStatus would include: TopicPartions, TaskId, >>> ProcessorTopology, >>>>>>> Idling, >>>>>>>> and State. >>>>>>>> >>>>>>>> Boyang: >>>>>>>> >>>>>>>> 2) I think that each task should report on whatever >>> TopicPartitions >>>>>> they >>>>>>>> hold, this means a Topic Partition might get reported twice but >>> the >>>>>> user >>>>>>>> can roll those up and use the larger one when looking at the >>> whole >>>>> app. >>>>>>>> >>>>>>>> 4) If the user collects the committed offsets across all the >>>> running >>>>>>>> clients there shouldn't be any tasks missing correct? >>>>>>>> >>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and >>>>>> TopicPartitions I >>>>>>>> think it is cleaner to report them separately. >>>>>>>> >>>>>>>> Guozhang: >>>>>>>> >>>>>>>> 1) Yes, that was my original plan but it made more sense to >>> mirror >>>>> how >>>>>>> the >>>>>>>> consumer exposes the committed offset. >>>>>>>> >>>>>>>> 3) That is a good point. I think that we should include >> internal >>>>> topics >>>>>>> as >>>>>>>> well. I think that if the topology were to evolve there should >> be >>>>> fair >>>>>>>> warning anyways. Maybe you can clarify what would be limited by >>>>>> exposing >>>>>>>> the interior topics here? I thought a user could find them in >>> other >>>>>> ways. >>>>>>>> If it is the name we could aynomise them before exposing them. >>>>>>>> >>>>>>>> Thank you all for your comments. If I did not respond directly >> to >>>> one >>>>>> of >>>>>>>> your questions I updated the kip to include the details it was >>>>>>> requesting. >>>>>>>> I didn't not include my proposed changes mentioned earlier as I >>>> would >>>>>>> like >>>>>>>> to get some feedback about what to include in TaskStatus and in >>>>>> general. >>>>>>>> >>>>>>>> best, >>>>>>>> Walker >>>>>>>> >>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang < >>> wangg...@gmail.com >>>>> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts: >>>>>>>>> >>>>>>>>> 1) Have you considered just relying on the >>>> `KafkaStreams#metrics()` >>>>>>> that >>>>>>>>> includes embedded consumer metrics that have the committed >>>> offsets >>>>>>>>> instead of adding a new API? Not advocating that this is a >>> better >>>>>>>> approach >>>>>>>>> but want to make sure we considered all options before we >> come >>> to >>>>> the >>>>>>>> "last >>>>>>>>> resort" of adding new public interfaces. >>>>>>>>> >>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but >>> the >>>>>>> returned >>>>>>>>> map is on partitions. I think we should make the javadoc and >>> the >>>>>> return >>>>>>>>> types consistent, either tasks or topic partitions. >>>>>>>>> >>>>>>>>> 3) In addition, if for 2) above we ended up with topic >>>> partitions, >>>>>> then >>>>>>>>> would they include only external source topics, or also >>> including >>>>>>>> internal >>>>>>>>> repartition / changelog topics? I think including only >> external >>>>>> source >>>>>>>>> topic partitions are not sufficient for your goal of tracking >>>>>> progress, >>>>>>>> but >>>>>>>>> exposing internal topic names are also a big commitment here >>> for >>>>>> future >>>>>>>>> topology evolution. >>>>>>>>> >>>>>>>>> 4) For "tasksIdling", I'm wondering if we can make it more >>>>> general, >>>>>>> that >>>>>>>>> the returned value is not just a boolean, but a TaskState >> that >>>> can >>>>> be >>>>>>> an >>>>>>>>> enum of "created, restoring, running, idle, closing". This >>> could >>>>> help >>>>>>> us >>>>>>>> in >>>>>>>>> the future to track other things like restoration efficiency >>> and >>>>>>>> rebalance >>>>>>>>> efficiency etc. >>>>>>>>> >>>>>>>>> 5) We need to clarify how is "idling" being defined here: >> e.g. >>> we >>>>> can >>>>>>>>> clearly state that a task is considered idle only if 1) lag >> is >>>>>>>>> increasing, indicating that there are indeed new records >>> arrived >>>> at >>>>>>>> source, >>>>>>>>> while committed offset is not advancing, AND 2) produced >> offset >>>>>>> (imagine >>>>>>>> we >>>>>>>>> may have punctuations that generate new data to the output >>> topic >>>>> even >>>>>>> if >>>>>>>>> there's no input for a while) is not advancing either. >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen < >>>>>>> reluctanthero...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely >>>>> empower >>>>>>>>> KStream >>>>>>>>>> users with better visibility. >>>>>>>>>> >>>>>>>>>> Meanwhile I got a couple of questions/suggestions: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 1. typo "repost/report" in the motivation section. >>>>>>>>>> >>>>>>>>>> 2. What offsets do we report when the task is under >>> restoration >>>>> or >>>>>>>>>> rebalancing? >>>>>>>>>> >>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics >>> are >>>>>> based >>>>>>>> off >>>>>>>>>> locally assigned tasks for each instance. >>>>>>>>>> >>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks >> that >>>> are >>>>>> not >>>>>>>>> local >>>>>>>>>> to the instance? Users would normally try to monitor all >> the >>>>>> possible >>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether >> we >>>> have >>>>>>> lost >>>>>>>>>> tasks. My brainstorming was whether it makes sense for the >>>> leader >>>>>>>>> instance >>>>>>>>>> to report the task progress as -1 for all “supposed to be >>>>> running” >>>>>>>> tasks, >>>>>>>>>> so that on the metrics collector side it could catch any >>>> missing >>>>>>> tasks. >>>>>>>>>> >>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`. >>> Why >>>>> not >>>>>>>>> report a >>>>>>>>>> map/set for idling tasks just as what we did for committed >>>>> offsets? >>>>>>>>>> >>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the >> key >>> in >>>>> the >>>>>>>>>> returned map? >>>>>>>>>> 7. Could we include some details in where we got the commit >>>>> offsets >>>>>>> for >>>>>>>>>> each task? Is it through consumer offset fetch, or the >> stream >>>>>>>> processing >>>>>>>>>> progress based on the records fetched? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson < >>>>>>> wcarl...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hello all, >>>>>>>>>>> >>>>>>>>>>> I would like to start discussion on KIP-715. This kip >> aims >>> to >>>>>> make >>>>>>> it >>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the >>>>>> committed >>>>>>>>> offset >>>>>>>>>>> in a similar way as the consumer client does. >>>>>>>>>>> >>>>>>>>>>> Here is the KIP: >>>> https://cwiki.apache.org/confluence/x/aRRRCg >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Walker >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>> >> >