This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v1.0 by this push:
new 7beae1830 fix: add pagination to job_collector task (#8240)
7beae1830 is described below
commit 7beae1830d053e85b72cc87fece6b3a52b73f8c6
Author: Claudio Mascaro <[email protected]>
AuthorDate: Tue Dec 17 23:39:45 2024 -0300
fix: add pagination to job_collector task (#8240)
---
.../plugins/github_graphql/tasks/job_collector.go | 73 +++++++++++++++-------
1 file changed, 50 insertions(+), 23 deletions(-)
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 73c914a8b..af25d5897 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -51,7 +51,11 @@ type GraphqlQueryCheckSuite struct {
// equal to Job in rest
CheckRuns struct {
TotalCount int
- Nodes []struct {
+ PageInfo struct {
+ EndCursor string `graphql:"endCursor"`
+ HasNextPage bool `graphql:"hasNextPage"`
+ }
+ Nodes []struct {
Id string
Name string
DetailsUrl string
@@ -79,7 +83,7 @@ type GraphqlQueryCheckSuite struct {
}
} `graphql:"steps(first: 50)"`
}
- } `graphql:"checkRuns(first: 50)"`
+ } `graphql:"checkRuns(first: $pageSize, after: $skipCursor)"`
} `graphql:"... on CheckSuite"`
}
@@ -95,7 +99,45 @@ var CollectJobsMeta = plugin.SubTaskMeta{
DomainTypes: []string{plugin.DOMAIN_TYPE_CICD},
}
-var _ plugin.SubTaskEntryPoint = CollectAccount
+var _ plugin.SubTaskEntryPoint = CollectJobs
+
+func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs)
(*helper.GraphqlQueryPageInfo, error) {
+ queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
+ hasNextPage := false
+ endCursor := ""
+ for _, node := range queryWrapper.Node {
+ if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
+ hasNextPage = true
+ endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
+ break
+ }
+ }
+ return &helper.GraphqlQueryPageInfo{
+ EndCursor: endCursor,
+ HasNextPage: hasNextPage,
+ }, nil
+}
+
+func buildQuery(reqData *helper.GraphqlRequestData) (interface{},
map[string]interface{}, error) {
+ query := &GraphqlQueryCheckRunWrapper{}
+ if reqData == nil {
+ return query, map[string]interface{}{}, nil
+ }
+ workflowRuns := reqData.Input.([]interface{})
+ checkSuiteIds := []map[string]interface{}{}
+ for _, iWorkflowRuns := range workflowRuns {
+ workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
+ checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
+ `id`: graphql.ID(workflowRun.CheckSuiteNodeID),
+ })
+ }
+ variables := map[string]interface{}{
+ "node": checkSuiteIds,
+ "pageSize": graphql.Int(reqData.Pager.Size),
+ "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
+ }
+ return query, variables, nil
+}
func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
@@ -137,26 +179,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
- InputStep: 20,
+ InputStep: 10,
GraphqlClient: data.GraphqlClient,
- BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
- query := &GraphqlQueryCheckRunWrapper{}
- if reqData == nil {
- return query, map[string]interface{}{}, nil
- }
- workflowRuns := reqData.Input.([]interface{})
- checkSuiteIds := []map[string]interface{}{}
- for _, iWorkflowRuns := range workflowRuns {
- workflowRun :=
iWorkflowRuns.(*SimpleWorkflowRun)
- checkSuiteIds = append(checkSuiteIds,
map[string]interface{}{
- `id`:
graphql.ID(workflowRun.CheckSuiteNodeID),
- })
- }
- variables := map[string]interface{}{
- "node": checkSuiteIds,
- }
- return query, variables, nil
- },
+ BuildQuery: buildQuery,
+ GetPageInfo: getPageInfo,
ResponseParser: func(queryWrapper any) (messages
[]json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
for _, node := range query.Node {
@@ -168,12 +194,13 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
if apiCollector.GetSince() != nil &&
!apiCollector.GetSince().Before(*updatedAt) {
return messages,
helper.ErrFinishCollect
}
- messages = append(messages,
errors.Must1(json.Marshal(node)))
+ messages = append(messages,
errors.Must1(json.Marshal(checkRun)))
}
}
return
},
IgnoreQueryErrors: true,
+ PageSize: 20,
})
if err != nil {
return err