This is an automated email from the ASF dual-hosted git repository.

klesh pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v1.0 by this push:
     new 7beae1830 fix: add pagination to job_collector task (#8240)
7beae1830 is described below

commit 7beae1830d053e85b72cc87fece6b3a52b73f8c6
Author: Claudio Mascaro <[email protected]>
AuthorDate: Tue Dec 17 23:39:45 2024 -0300

    fix: add pagination to job_collector task (#8240)
---
 .../plugins/github_graphql/tasks/job_collector.go  | 73 +++++++++++++++-------
 1 file changed, 50 insertions(+), 23 deletions(-)

diff --git a/backend/plugins/github_graphql/tasks/job_collector.go 
b/backend/plugins/github_graphql/tasks/job_collector.go
index 73c914a8b..af25d5897 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -51,7 +51,11 @@ type GraphqlQueryCheckSuite struct {
                // equal to Job in rest
                CheckRuns struct {
                        TotalCount int
-                       Nodes      []struct {
+                       PageInfo   struct {
+                               EndCursor   string `graphql:"endCursor"`
+                               HasNextPage bool   `graphql:"hasNextPage"`
+                       }
+                       Nodes []struct {
                                Id          string
                                Name        string
                                DetailsUrl  string
@@ -79,7 +83,7 @@ type GraphqlQueryCheckSuite struct {
                                        }
                                } `graphql:"steps(first: 50)"`
                        }
-               } `graphql:"checkRuns(first: 50)"`
+               } `graphql:"checkRuns(first: $pageSize, after: $skipCursor)"`
        } `graphql:"... on CheckSuite"`
 }
 
@@ -95,7 +99,45 @@ var CollectJobsMeta = plugin.SubTaskMeta{
        DomainTypes:      []string{plugin.DOMAIN_TYPE_CICD},
 }
 
-var _ plugin.SubTaskEntryPoint = CollectAccount
+var _ plugin.SubTaskEntryPoint = CollectJobs
+
+func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) 
(*helper.GraphqlQueryPageInfo, error) {
+       queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
+       hasNextPage := false
+       endCursor := ""
+       for _, node := range queryWrapper.Node {
+               if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
+                       hasNextPage = true
+                       endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
+                       break
+               }
+       }
+       return &helper.GraphqlQueryPageInfo{
+               EndCursor:   endCursor,
+               HasNextPage: hasNextPage,
+       }, nil
+}
+
+func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, 
map[string]interface{}, error) {
+       query := &GraphqlQueryCheckRunWrapper{}
+       if reqData == nil {
+               return query, map[string]interface{}{}, nil
+       }
+       workflowRuns := reqData.Input.([]interface{})
+       checkSuiteIds := []map[string]interface{}{}
+       for _, iWorkflowRuns := range workflowRuns {
+               workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
+               checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
+                       `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
+               })
+       }
+       variables := map[string]interface{}{
+               "node":       checkSuiteIds,
+               "pageSize":   graphql.Int(reqData.Pager.Size),
+               "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
+       }
+       return query, variables, nil
+}
 
 func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
        db := taskCtx.GetDal()
@@ -137,26 +179,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
 
        err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
                Input:         iterator,
-               InputStep:     20,
+               InputStep:     10,
                GraphqlClient: data.GraphqlClient,
-               BuildQuery: func(reqData *helper.GraphqlRequestData) 
(interface{}, map[string]interface{}, error) {
-                       query := &GraphqlQueryCheckRunWrapper{}
-                       if reqData == nil {
-                               return query, map[string]interface{}{}, nil
-                       }
-                       workflowRuns := reqData.Input.([]interface{})
-                       checkSuiteIds := []map[string]interface{}{}
-                       for _, iWorkflowRuns := range workflowRuns {
-                               workflowRun := 
iWorkflowRuns.(*SimpleWorkflowRun)
-                               checkSuiteIds = append(checkSuiteIds, 
map[string]interface{}{
-                                       `id`: 
graphql.ID(workflowRun.CheckSuiteNodeID),
-                               })
-                       }
-                       variables := map[string]interface{}{
-                               "node": checkSuiteIds,
-                       }
-                       return query, variables, nil
-               },
+               BuildQuery:    buildQuery,
+               GetPageInfo:   getPageInfo,
                ResponseParser: func(queryWrapper any) (messages 
[]json.RawMessage, err errors.Error) {
                        query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
                        for _, node := range query.Node {
@@ -168,12 +194,13 @@ func CollectJobs(taskCtx plugin.SubTaskContext) 
errors.Error {
                                        if apiCollector.GetSince() != nil && 
!apiCollector.GetSince().Before(*updatedAt) {
                                                return messages, 
helper.ErrFinishCollect
                                        }
-                                       messages = append(messages, 
errors.Must1(json.Marshal(node)))
+                                       messages = append(messages, 
errors.Must1(json.Marshal(checkRun)))
                                }
                        }
                        return
                },
                IgnoreQueryErrors: true,
+               PageSize:          20,
        })
        if err != nil {
                return err

Reply via email to