This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 40669df5a refactor: StatefulApiCollector adopts CollectorStateManager
(#7324)
40669df5a is described below
commit 40669df5a46c144f2478876fa0da613b9167a0f7
Author: Klesh Wong <[email protected]>
AuthorDate: Mon Apr 15 16:11:01 2024 +0800
refactor: StatefulApiCollector adopts CollectorStateManager (#7324)
---
...tor_with_state.go => api_collector_stateful.go} | 155 ++++-----------------
.../plugins/azuredevops_go/tasks/pr_collector.go | 15 +-
backend/plugins/bitbucket/tasks/api_common.go | 24 ++--
.../plugins/bitbucket_server/tasks/api_common.go | 6 +-
backend/plugins/github/tasks/cicd_job_collector.go | 10 +-
backend/plugins/github/tasks/comment_collector.go | 10 +-
backend/plugins/github/tasks/commit_collector.go | 10 +-
backend/plugins/github/tasks/issue_collector.go | 10 +-
.../plugins/github/tasks/pr_commit_collector.go | 10 +-
.../plugins/github/tasks/pr_review_collector.go | 10 +-
.../github/tasks/pr_review_comment_collector.go | 10 +-
.../github_graphql/tasks/deployment_collector.go | 8 +-
.../github_graphql/tasks/issue_collector.go | 8 +-
.../plugins/github_graphql/tasks/job_collector.go | 10 +-
.../plugins/github_graphql/tasks/pr_collector.go | 8 +-
.../plugins/gitlab/tasks/deployment_collector.go | 14 +-
backend/plugins/gitlab/tasks/issue_collector.go | 4 +-
backend/plugins/gitlab/tasks/mr_collector.go | 10 +-
.../plugins/gitlab/tasks/mr_detail_collector.go | 6 +-
backend/plugins/gitlab/tasks/pipeline_collector.go | 10 +-
.../gitlab/tasks/pipeline_detail_collector.go | 6 +-
backend/plugins/gitlab/tasks/shared.go | 8 +-
.../plugins/gitlab/tasks/trigger_job_collector.go | 16 +--
backend/plugins/jenkins/tasks/stage_collector.go | 10 +-
.../jira/tasks/development_panel_collector.go | 10 +-
backend/plugins/jira/tasks/epic_collector.go | 10 +-
.../jira/tasks/issue_changelog_collector.go | 10 +-
backend/plugins/jira/tasks/issue_collector.go | 10 +-
.../plugins/jira/tasks/issue_comment_collector.go | 10 +-
backend/plugins/jira/tasks/remotelink_collector.go | 10 +-
backend/plugins/jira/tasks/worklog_collector.go | 16 ++-
.../plugins/tapd/tasks/bug_changelog_collector.go | 10 +-
backend/plugins/tapd/tasks/bug_collector.go | 10 +-
backend/plugins/tapd/tasks/bug_commit_collector.go | 10 +-
backend/plugins/tapd/tasks/iteration_collector.go | 10 +-
backend/plugins/tapd/tasks/story_bug_collector.go | 10 +-
.../tapd/tasks/story_changelog_collector.go | 4 +-
backend/plugins/tapd/tasks/story_collector.go | 10 +-
.../plugins/tapd/tasks/story_commit_collector.go | 10 +-
.../plugins/tapd/tasks/task_changelog_collector.go | 10 +-
backend/plugins/tapd/tasks/task_collector.go | 6 +-
.../plugins/tapd/tasks/task_commit_collector.go | 10 +-
backend/plugins/tapd/tasks/worklog_collector.go | 10 +-
.../plugins/zentao/tasks/bug_commits_collector.go | 10 +-
.../zentao/tasks/story_commits_collector.go | 10 +-
.../plugins/zentao/tasks/task_commits_collector.go | 10 +-
46 files changed, 255 insertions(+), 349 deletions(-)
diff --git a/backend/helpers/pluginhelper/api/api_collector_with_state.go
b/backend/helpers/pluginhelper/api/api_collector_stateful.go
similarity index 67%
rename from backend/helpers/pluginhelper/api/api_collector_with_state.go
rename to backend/helpers/pluginhelper/api/api_collector_stateful.go
index 513c442cb..2c1086bbc 100644
--- a/backend/helpers/pluginhelper/api/api_collector_with_state.go
+++ b/backend/helpers/pluginhelper/api/api_collector_stateful.go
@@ -21,172 +21,74 @@ import (
"encoding/json"
"net/http"
"net/url"
- "reflect"
"time"
- "github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
- "github.com/apache/incubator-devlake/core/models"
"github.com/apache/incubator-devlake/core/plugin"
)
-// ApiCollectorStateManager save collector state in framework table
-type ApiCollectorStateManager struct {
+// StatefulApiCollector runs multiple collectors as a single subtask and
maintains the state of the collector
+// mainly the time range to collect across multiple collections. It is useful
when you need to support timeAfter
+// and diff sync for APIs that do not support filtering by the updated date.
+type StatefulApiCollector struct {
RawDataSubTaskArgs
+ CollectorStateManager
// *ApiCollector
// *GraphqlCollector
- subtasks []plugin.SubTask
- newState models.CollectorLatestState
- IsIncremental bool
- Since *time.Time
- Before *time.Time
+ nestedCollectors []plugin.SubTask
}
-type CollectorOptions struct {
- TimeAfter string `json:"timeAfter,omitempty"
mapstructure:"timeAfter,omitempty"`
-}
-
-// NewStatefulApiCollector create a new ApiCollectorStateManager
-func NewStatefulApiCollector(args RawDataSubTaskArgs)
(*ApiCollectorStateManager, errors.Error) {
- db := args.Ctx.GetDal()
+// NewStatefulApiCollector create a new StatefulApiCollector
+func NewStatefulApiCollector(args RawDataSubTaskArgs) (*StatefulApiCollector,
errors.Error) {
syncPolicy := args.Ctx.TaskContext().SyncPolicy()
rawDataSubTask, err := NewRawDataSubTask(args)
if err != nil {
- return nil, errors.Default.Wrap(err, "Couldn't resolve raw
subtask args")
- }
-
- // get optionTimeAfter from options
- data := args.Ctx.GetData()
- value := reflect.ValueOf(data)
- if value.Kind() == reflect.Ptr && value.Elem().Kind() == reflect.Struct
{
- options := value.Elem().FieldByName("Options")
- if options.IsValid() && options.Kind() == reflect.Ptr &&
options.Elem().Kind() == reflect.Struct {
- collectorOptions :=
options.Elem().FieldByName("CollectorOptions")
- if collectorOptions.IsValid() &&
collectorOptions.Kind() == reflect.Struct {
- timeAfter :=
collectorOptions.FieldByName("TimeAfter")
- if timeAfter.IsValid() && timeAfter.Kind() ==
reflect.String && timeAfter.String() != "" {
- optionTimeAfter, parseErr :=
time.Parse(time.RFC3339, timeAfter.String())
- if parseErr != nil {
- return nil,
errors.Default.Wrap(parseErr, "Failed to parse timeAfter!")
- }
- if syncPolicy != nil {
- syncPolicy.TimeAfter =
&optionTimeAfter
- } else {
- syncPolicy = &models.SyncPolicy{
- TimeAfter:
&optionTimeAfter,
- }
- }
- }
- }
- }
+ return nil, err
}
-
- // CollectorLatestState retrieves the latest collector state from the
database
- oldState := models.CollectorLatestState{}
- err = db.First(&oldState, dal.Where(`raw_data_table = ? AND
raw_data_params = ?`, rawDataSubTask.table, rawDataSubTask.params))
+ stateManager, err := NewCollectorStateManager(args.Ctx, syncPolicy,
rawDataSubTask.table, rawDataSubTask.params)
if err != nil {
- if db.IsErrorNotFound(err) {
- oldState = models.CollectorLatestState{
- RawDataTable: rawDataSubTask.table,
- RawDataParams: rawDataSubTask.params,
- }
- } else {
- return nil, errors.Default.Wrap(err, "failed to load
JiraLatestCollectorMeta")
- }
- }
- // Extract timeAfter and latestSuccessStart from old state
- oldTimeAfter := oldState.TimeAfter
- oldLatestSuccessStart := oldState.LatestSuccessStart
-
- // Calculate incremental and since based on syncPolicy and old state
- var isIncremental bool
- var since *time.Time
-
- if oldLatestSuccessStart == nil {
- // 1. If no oldState.LatestSuccessStart, not incremental and
since is syncPolicy.TimeAfter
- isIncremental = false
- if syncPolicy != nil {
- since = syncPolicy.TimeAfter
- }
- } else if syncPolicy == nil {
- // 2. If no syncPolicy, incremental and since is
oldState.LatestSuccessStart
- isIncremental = true
- since = oldLatestSuccessStart
- } else if syncPolicy.FullSync {
- // 3. If fullSync true, not incremental and since is
syncPolicy.TimeAfter
- isIncremental = false
- since = syncPolicy.TimeAfter
- } else if syncPolicy.TimeAfter == nil {
- // 4. If no syncPolicy TimeAfter, incremental and since is
oldState.LatestSuccessStart
- isIncremental = true
- since = oldLatestSuccessStart
- } else {
- // 5. If syncPolicy.TimeAfter not nil
- if oldTimeAfter != nil &&
syncPolicy.TimeAfter.Before(*oldTimeAfter) {
- // 4.1 If oldTimeAfter not nil and syncPolicy.TimeAfter
before oldTimeAfter, incremental is false and since is syncPolicy.TimeAfter
- isIncremental = false
- since = syncPolicy.TimeAfter
- } else {
- // 4.2 If oldTimeAfter nil or syncPolicy.TimeAfter
after oldTimeAfter, incremental is true and since is oldState.LatestSuccessStart
- isIncremental = true
- since = oldLatestSuccessStart
- }
- }
-
- currentTime := time.Now()
- oldState.LatestSuccessStart = ¤tTime
- if syncPolicy != nil {
- oldState.TimeAfter = syncPolicy.TimeAfter
- if syncPolicy.TimeAfter != nil && oldTimeAfter != nil &&
(oldTimeAfter).Before(*syncPolicy.TimeAfter) && !syncPolicy.FullSync {
- oldState.TimeAfter = oldTimeAfter
- }
+ return nil, err
}
-
- return &ApiCollectorStateManager{
- RawDataSubTaskArgs: args,
- newState: oldState,
- IsIncremental: isIncremental,
- Since: since,
- Before: ¤tTime,
+ return &StatefulApiCollector{
+ RawDataSubTaskArgs: args,
+ CollectorStateManager: *stateManager,
}, nil
-
}
-// InitCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitCollector(args ApiCollectorArgs)
errors.Error {
+// InitCollector appends a new collector to the list
+func (m *StatefulApiCollector) InitCollector(args ApiCollectorArgs)
errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
- args.Incremental = args.Incremental || m.IsIncremental
+ args.Incremental = m.CollectorStateManager.IsIncremental()
apiCollector, err := NewApiCollector(args)
if err != nil {
return err
}
- m.subtasks = append(m.subtasks, apiCollector)
+ m.nestedCollectors = append(m.nestedCollectors, apiCollector)
return nil
}
-// InitGraphQLCollector init the embedded collector
-func (m *ApiCollectorStateManager) InitGraphQLCollector(args
GraphqlCollectorArgs) errors.Error {
+// InitGraphQLCollector appends a new GraphQL collector to the list
+func (m *StatefulApiCollector) InitGraphQLCollector(args GraphqlCollectorArgs)
errors.Error {
args.RawDataSubTaskArgs = m.RawDataSubTaskArgs
- args.Incremental = args.Incremental || m.IsIncremental
+ args.Incremental = m.CollectorStateManager.IsIncremental()
graphqlCollector, err := NewGraphqlCollector(args)
if err != nil {
return err
}
- m.subtasks = append(m.subtasks, graphqlCollector)
+ m.nestedCollectors = append(m.nestedCollectors, graphqlCollector)
return nil
}
-// Execute the embedded collector and record execute state
-func (m *ApiCollectorStateManager) Execute() errors.Error {
- for _, subtask := range m.subtasks {
+// Execute all nested collectors and save the state if all collectors succeed
+func (m *StatefulApiCollector) Execute() errors.Error {
+ for _, subtask := range m.nestedCollectors {
err := subtask.Execute()
if err != nil {
return err
}
}
- db := m.Ctx.GetDal()
- return db.CreateOrUpdate(&m.newState)
+ return m.CollectorStateManager.Close()
}
// NewStatefulApiCollectorForFinalizableEntity aims to add timeFilter/diffSync
support for
@@ -222,8 +124,8 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
return nil, err
}
- createdAfter := manager.Since
- isIncremental := manager.IsIncremental
+ createdAfter := manager.CollectorStateManager.GetSince()
+ isIncremental := manager.CollectorStateManager.IsIncremental()
// step 1: create a collector to collect newly added records
err = manager.InitCollector(ApiCollectorArgs{
@@ -329,7 +231,6 @@ func NewStatefulApiCollectorForFinalizableEntity(args
FinalizableApiCollectorArg
type FinalizableApiCollectorArgs struct {
RawDataSubTaskArgs
ApiClient RateLimitedApiClient
- TimeAfter *time.Time // leave it be nil to disable time
filter
CollectNewRecordsByList FinalizableApiCollectorListArgs
CollectUnfinishedDetails *FinalizableApiCollectorDetailArgs
}
diff --git a/backend/plugins/azuredevops_go/tasks/pr_collector.go
b/backend/plugins/azuredevops_go/tasks/pr_collector.go
index 7933b3d46..30a63dba7 100644
--- a/backend/plugins/azuredevops_go/tasks/pr_collector.go
+++ b/backend/plugins/azuredevops_go/tasks/pr_collector.go
@@ -19,11 +19,12 @@ package tasks
import (
"fmt"
+ "net/url"
+ "time"
+
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/plugin"
"github.com/apache/incubator-devlake/helpers/pluginhelper/api"
- "net/url"
- "time"
)
func init() {
@@ -51,12 +52,12 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
Options: data.Options,
}
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
PageSize: 100,
@@ -67,9 +68,9 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("$skip", fmt.Sprint(reqData.Pager.Skip))
query.Set("$top", fmt.Sprint(reqData.Pager.Size))
- if collectorWithState.Since != nil {
+ if apiCollector.GetSince() != nil {
query.Set("searchCriteria.queryTimeRangeType",
"created")
- query.Set("searchCriteria.minTime",
collectorWithState.Since.Format(time.RFC3339))
+ query.Set("searchCriteria.minTime",
apiCollector.GetSince().Format(time.RFC3339))
}
return query, nil
},
@@ -81,5 +82,5 @@ func CollectApiPullRequests(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/bitbucket/tasks/api_common.go
b/backend/plugins/bitbucket/tasks/api_common.go
index 8990fdbaf..352f41cb9 100644
--- a/backend/plugins/bitbucket/tasks/api_common.go
+++ b/backend/plugins/bitbucket/tasks/api_common.go
@@ -93,7 +93,7 @@ func GetQuery(reqData *api.RequestData) (url.Values,
errors.Error) {
}
// GetQueryCreatedAndUpdated is a common GeyQuery for timeFilter and
incremental
-func GetQueryCreatedAndUpdated(fields string, collectorWithState
*api.ApiCollectorStateManager) func(reqData *api.RequestData) (url.Values,
errors.Error) {
+func GetQueryCreatedAndUpdated(fields string, apiCollector
*api.StatefulApiCollector) func(reqData *api.RequestData) (url.Values,
errors.Error) {
return func(reqData *api.RequestData) (url.Values, errors.Error) {
query, err := GetQuery(reqData)
if err != nil {
@@ -102,8 +102,8 @@ func GetQueryCreatedAndUpdated(fields string,
collectorWithState *api.ApiCollect
query.Set("fields", fields)
query.Set("sort", "created_on")
- if collectorWithState.Since != nil {
- query.Set("q", fmt.Sprintf("updated_on>=%s",
collectorWithState.Since.Format(time.RFC3339)))
+ if apiCollector.GetSince() != nil {
+ query.Set("q", fmt.Sprintf("updated_on>=%s",
apiCollector.GetSince().Format(time.RFC3339)))
}
return query, nil
}
@@ -164,7 +164,7 @@ func GetRawMessageFromResponse(res *http.Response)
([]json.RawMessage, errors.Er
return rawMessages.Values, nil
}
-func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, sac
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
@@ -175,8 +175,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+ if sac.IsIncremental() && sac.GetSince() != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*sac.GetSince()))
}
// construct the input iterator
@@ -188,7 +188,7 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(BitbucketInput{}))
}
-func GetIssuesIterator(taskCtx plugin.SubTaskContext, collectorWithState
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetIssuesIterator(taskCtx plugin.SubTaskContext, sac
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
@@ -199,8 +199,8 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*collectorWithState.Since))
+ if sac.IsIncremental() && sac.GetSince() != nil {
+ clauses = append(clauses, dal.Where("bitbucket_updated_at > ?",
*sac.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
@@ -211,7 +211,7 @@ func GetIssuesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api.Ap
return api.NewDalCursorIterator(db, cursor,
reflect.TypeOf(BitbucketInput{}))
}
-func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState
*api.ApiCollectorStateManager) (*api.DalCursorIterator, errors.Error) {
+func GetPipelinesIterator(taskCtx plugin.SubTaskContext, sac
*api.StatefulApiCollector) (*api.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketTaskData)
clauses := []dal.Clause{
@@ -222,8 +222,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *api
data.Options.FullName, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *collectorWithState.Since))
+ if sac.IsIncremental() && sac.GetSince() != nil {
+ clauses = append(clauses, dal.Where("bitbucket_complete_on >
?", *sac.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/bitbucket_server/tasks/api_common.go
b/backend/plugins/bitbucket_server/tasks/api_common.go
index 55e99ce5b..0b2ad1f68 100644
--- a/backend/plugins/bitbucket_server/tasks/api_common.go
+++ b/backend/plugins/bitbucket_server/tasks/api_common.go
@@ -119,7 +119,7 @@ func GetRawMessageFromResponse(res *http.Response)
([]json.RawMessage, errors.Er
return rawMessages.Values, nil
}
-func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, collectorWithState
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetPullRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*BitbucketServerTaskData)
clauses := []dal.Clause{
@@ -131,8 +131,8 @@ func GetPullRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses,
dal.Where("bpr.bitbucket_server_updated_at > ?", *collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses,
dal.Where("bpr.bitbucket_server_updated_at > ?", *apiCollector.GetSince()))
}
// construct the input iterator
diff --git a/backend/plugins/github/tasks/cicd_job_collector.go
b/backend/plugins/github/tasks/cicd_job_collector.go
index dda4b90c0..60780339f 100644
--- a/backend/plugins/github/tasks/cicd_job_collector.go
+++ b/backend/plugins/github/tasks/cicd_job_collector.go
@@ -52,7 +52,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
// state manager
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -73,8 +73,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
data.Options.GithubId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("github_updated_at > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -85,7 +85,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
return err
}
// collect jobs
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
@@ -118,7 +118,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
type SimpleGithubRun struct {
diff --git a/backend/plugins/github/tasks/comment_collector.go
b/backend/plugins/github/tasks/comment_collector.go
index e06f023f4..9f40e7736 100644
--- a/backend/plugins/github/tasks/comment_collector.go
+++ b/backend/plugins/github/tasks/comment_collector.go
@@ -46,7 +46,7 @@ var CollectApiCommentsMeta = plugin.SubTaskMeta{
func CollectApiComments(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -58,15 +58,15 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
UrlTemplate: "repos/{{ .Params.Name }}/issues/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.Since != nil {
- query.Set("since",
collectorWithState.Since.String())
+ if apiCollector.GetSince() != nil {
+ query.Set("since",
apiCollector.GetSince().String())
}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
@@ -89,5 +89,5 @@ func CollectApiComments(taskCtx plugin.SubTaskContext)
errors.Error {
return errors.Default.Wrap(err, "error collecting github
comments")
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github/tasks/commit_collector.go
b/backend/plugins/github/tasks/commit_collector.go
index b735a508d..019060c17 100644
--- a/backend/plugins/github/tasks/commit_collector.go
+++ b/backend/plugins/github/tasks/commit_collector.go
@@ -46,7 +46,7 @@ var CollectApiCommitsMeta = plugin.SubTaskMeta{
func CollectApiCommits(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -58,7 +58,7 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
/*
@@ -77,8 +77,8 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.Since != nil {
- query.Set("since",
collectorWithState.Since.String())
+ if apiCollector.GetSince() != nil {
+ query.Set("since",
apiCollector.GetSince().String())
}
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
@@ -117,5 +117,5 @@ func CollectApiCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github/tasks/issue_collector.go
b/backend/plugins/github/tasks/issue_collector.go
index 4e9b25d21..5558da697 100644
--- a/backend/plugins/github/tasks/issue_collector.go
+++ b/backend/plugins/github/tasks/issue_collector.go
@@ -46,7 +46,7 @@ var CollectApiIssuesMeta = plugin.SubTaskMeta{
func CollectApiIssues(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -58,7 +58,7 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
/*
@@ -77,8 +77,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
query.Set("state", "all")
- if collectorWithState.Since != nil {
- query.Set("since",
collectorWithState.Since.String())
+ if apiCollector.GetSince() != nil {
+ query.Set("since",
apiCollector.GetSince().String())
}
query.Set("direction", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
@@ -117,5 +117,5 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github/tasks/pr_commit_collector.go
b/backend/plugins/github/tasks/pr_commit_collector.go
index 487e6ac14..0604816d8 100644
--- a/backend/plugins/github/tasks/pr_commit_collector.go
+++ b/backend/plugins/github/tasks/pr_commit_collector.go
@@ -61,7 +61,7 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -78,10 +78,10 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
clauses = append(
clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.Since),
+ dal.Where("github_updated_at > ?",
apiCollector.GetSince()),
)
}
@@ -95,7 +95,7 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
Input: iterator,
@@ -141,5 +141,5 @@ func CollectApiPullRequestCommits(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github/tasks/pr_review_collector.go
b/backend/plugins/github/tasks/pr_review_collector.go
index 60430c583..e4a16649d 100644
--- a/backend/plugins/github/tasks/pr_review_collector.go
+++ b/backend/plugins/github/tasks/pr_review_collector.go
@@ -53,7 +53,7 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -70,10 +70,10 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
dal.From(models.GithubPullRequest{}.TableName()),
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
clauses = append(
clauses,
- dal.Where("github_updated_at > ?",
collectorWithState.Since),
+ dal.Where("github_updated_at > ?",
apiCollector.GetSince()),
)
}
@@ -89,7 +89,7 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
Input: iterator,
@@ -117,5 +117,5 @@ func CollectApiPullRequestReviews(taskCtx
plugin.SubTaskContext) errors.Error {
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github/tasks/pr_review_comment_collector.go
b/backend/plugins/github/tasks/pr_review_comment_collector.go
index d53371444..d6dfb296f 100644
--- a/backend/plugins/github/tasks/pr_review_comment_collector.go
+++ b/backend/plugins/github/tasks/pr_review_comment_collector.go
@@ -49,7 +49,7 @@ var CollectApiPrReviewCommentsMeta = plugin.SubTaskMeta{
func CollectPrReviewComments(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -61,7 +61,7 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
Header: func(reqData *helper.RequestData) (http.Header,
errors.Error) {
@@ -74,8 +74,8 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "repos/{{ .Params.Name }}/pulls/comments",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.Since != nil {
- query.Set("since",
collectorWithState.Since.String())
+ if apiCollector.GetSince() != nil {
+ query.Set("since",
apiCollector.GetSince().String())
}
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("direction", "asc")
@@ -97,5 +97,5 @@ func CollectPrReviewComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github_graphql/tasks/deployment_collector.go
b/backend/plugins/github_graphql/tasks/deployment_collector.go
index 437088629..e3a646baf 100644
--- a/backend/plugins/github_graphql/tasks/deployment_collector.go
+++ b/backend/plugins/github_graphql/tasks/deployment_collector.go
@@ -86,7 +86,7 @@ type GraphqlQueryDeploymentDeployment struct {
// CollectDeployments will request github api via graphql and store the result
into raw layer.
func CollectDeployments(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -98,7 +98,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+ err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 100,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
@@ -124,7 +124,7 @@ func CollectDeployments(taskCtx plugin.SubTaskContext)
errors.Error {
query := iQuery.(*GraphqlQueryDeploymentWrapper)
deployments := query.Repository.Deployments.Deployments
for _, rawL := range deployments {
- if collectorWithState.Since != nil &&
!collectorWithState.Since.Before(rawL.UpdatedAt) {
+ if apiCollector.GetSince() != nil &&
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
return nil, helper.ErrFinishCollect
}
}
@@ -134,5 +134,5 @@ func CollectDeployments(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github_graphql/tasks/issue_collector.go
b/backend/plugins/github_graphql/tasks/issue_collector.go
index 313b7cbac..dbf1b1e95 100644
--- a/backend/plugins/github_graphql/tasks/issue_collector.go
+++ b/backend/plugins/github_graphql/tasks/issue_collector.go
@@ -82,7 +82,7 @@ var _ plugin.SubTaskEntryPoint = CollectIssues
func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -94,7 +94,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+ err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
BuildQuery: func(reqData *helper.GraphqlRequestData)
(interface{}, map[string]interface{}, error) {
@@ -119,7 +119,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
query := iQuery.(*GraphqlQueryIssueWrapper)
issues := query.Repository.IssueList.Issues
for _, rawL := range issues {
- if collectorWithState.Since != nil &&
!collectorWithState.Since.Before(rawL.UpdatedAt) {
+ if apiCollector.GetSince() != nil &&
!apiCollector.GetSince().Before(rawL.UpdatedAt) {
return nil, helper.ErrFinishCollect
}
}
@@ -130,5 +130,5 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github_graphql/tasks/job_collector.go
b/backend/plugins/github_graphql/tasks/job_collector.go
index 48ec17045..effac75ff 100644
--- a/backend/plugins/github_graphql/tasks/job_collector.go
+++ b/backend/plugins/github_graphql/tasks/job_collector.go
@@ -100,7 +100,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
- collectorWithState, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
+ apiCollector, err :=
helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: githubTasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -118,8 +118,8 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("repo_id = ? and connection_id=?",
data.Options.GithubId, data.Options.ConnectionId),
dal.Orderby("github_updated_at DESC"),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("github_updated_at > ?",
*collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("github_updated_at > ?",
*apiCollector.GetSince()))
}
cursor, err := db.Cursor(
@@ -134,7 +134,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err =
collectorWithState.InitGraphQLCollector(helper.GraphqlCollectorArgs{
+ err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
InputStep: 20,
GraphqlClient: data.GraphqlClient,
@@ -168,5 +168,5 @@ func CollectJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/github_graphql/tasks/pr_collector.go
b/backend/plugins/github_graphql/tasks/pr_collector.go
index 2bc887305..903ce6c5b 100644
--- a/backend/plugins/github_graphql/tasks/pr_collector.go
+++ b/backend/plugins/github_graphql/tasks/pr_collector.go
@@ -130,7 +130,7 @@ var _ plugin.SubTaskEntryPoint = CollectPrs
func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*tasks.GithubTaskData)
var err errors.Error
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: tasks.GithubApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -142,7 +142,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
- err = collectorWithState.InitGraphQLCollector(api.GraphqlCollectorArgs{
+ err = apiCollector.InitGraphQLCollector(api.GraphqlCollectorArgs{
GraphqlClient: data.GraphqlClient,
PageSize: 10,
/*
@@ -170,7 +170,7 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error
{
query := iQuery.(*GraphqlQueryPrWrapper)
prs := query.Repository.PullRequests.Prs
for _, rawL := range prs {
- if collectorWithState.Since != nil &&
!collectorWithState.Since.Before(rawL.CreatedAt) {
+ if apiCollector.GetSince() != nil &&
!apiCollector.GetSince().Before(rawL.CreatedAt) {
return nil, api.ErrFinishCollect
}
}
@@ -181,5 +181,5 @@ func CollectPrs(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/gitlab/tasks/deployment_collector.go
b/backend/plugins/gitlab/tasks/deployment_collector.go
index 6c4c2a74a..36ecb60c1 100644
--- a/backend/plugins/gitlab/tasks/deployment_collector.go
+++ b/backend/plugins/gitlab/tasks/deployment_collector.go
@@ -49,11 +49,11 @@ var CollectDeploymentMeta = plugin.SubTaskMeta{
func CollectDeployment(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_DEPLOYMENT)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
PageSize: 100,
@@ -70,11 +70,11 @@ func CollectDeployment(taskCtx plugin.SubTaskContext)
errors.Error {
} else {
query.Set("order_by", "created_at")
}
- if collectorWithState.Since != nil {
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if apiCollector.GetSince() != nil {
+ query.Set("updated_after",
apiCollector.GetSince().Format(time.RFC3339))
}
- if collectorWithState.Before != nil {
- query.Set("updated_before",
collectorWithState.Before.Format(time.RFC3339))
+ if apiCollector.GetUntil() != nil {
+ query.Set("updated_before",
apiCollector.GetUntil().Format(time.RFC3339))
}
return query, nil
},
@@ -84,5 +84,5 @@ func CollectDeployment(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/gitlab/tasks/issue_collector.go
b/backend/plugins/gitlab/tasks/issue_collector.go
index 92c3fbc21..9ae57fb4f 100644
--- a/backend/plugins/gitlab/tasks/issue_collector.go
+++ b/backend/plugins/gitlab/tasks/issue_collector.go
@@ -61,8 +61,8 @@ func CollectApiIssues(taskCtx plugin.SubTaskContext)
errors.Error {
*/
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.Since != nil {
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if collectorWithState.GetSince() != nil {
+ query.Set("updated_after",
collectorWithState.GetSince().Format(time.RFC3339))
}
query.Set("sort", "asc")
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
diff --git a/backend/plugins/gitlab/tasks/mr_collector.go
b/backend/plugins/gitlab/tasks/mr_collector.go
index 029b19043..320d977d2 100644
--- a/backend/plugins/gitlab/tasks/mr_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_collector.go
@@ -43,12 +43,12 @@ var CollectApiMergeRequestsMeta = plugin.SubTaskMeta{
func CollectApiMergeRequests(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_MERGE_REQUEST_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
UrlTemplate: "projects/{{ .Params.ProjectId
}}/merge_requests",
@@ -59,8 +59,8 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return nil, err
}
- if collectorWithState.Since != nil {
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if apiCollector.GetSince() != nil {
+ query.Set("updated_after",
apiCollector.GetSince().Format(time.RFC3339))
}
return query, nil
},
@@ -70,5 +70,5 @@ func CollectApiMergeRequests(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/gitlab/tasks/mr_detail_collector.go
b/backend/plugins/gitlab/tasks/mr_detail_collector.go
index eb4f8f3a6..bfb64f031 100644
--- a/backend/plugins/gitlab/tasks/mr_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/mr_detail_collector.go
@@ -74,7 +74,7 @@ func CollectApiMergeRequestDetails(taskCtx
plugin.SubTaskContext) errors.Error {
return collectorWithState.Execute()
}
-func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *helper.ApiCollectorStateManager)
(*helper.DalCursorIterator, errors.Error) {
+func GetMergeRequestDetailsIterator(taskCtx plugin.SubTaskContext,
apiCollector *helper.StatefulApiCollector) (*helper.DalCursorIterator,
errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
@@ -85,8 +85,8 @@ func GetMergeRequestDetailsIterator(taskCtx
plugin.SubTaskContext, collectorWith
data.Options.ProjectId, data.Options.ConnectionId, true,
),
}
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*apiCollector.GetSince()))
}
// construct the input iterator
diff --git a/backend/plugins/gitlab/tasks/pipeline_collector.go
b/backend/plugins/gitlab/tasks/pipeline_collector.go
index ceb911d68..77415506f 100644
--- a/backend/plugins/gitlab/tasks/pipeline_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_collector.go
@@ -44,7 +44,7 @@ var CollectApiPipelinesMeta = plugin.SubTaskMeta{
func CollectApiPipelines(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_PIPELINE_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -54,7 +54,7 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
RawDataSubTaskArgs: *rawDataSubTaskArgs,
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
@@ -62,8 +62,8 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
UrlTemplate: "projects/{{ .Params.ProjectId
}}/pipelines",
Query: func(reqData *helper.RequestData) (url.Values,
errors.Error) {
query := url.Values{}
- if collectorWithState.Since != nil {
- query.Set("updated_after",
collectorWithState.Since.Format(time.RFC3339))
+ if apiCollector.GetSince() != nil {
+ query.Set("updated_after",
apiCollector.GetSince().Format(time.RFC3339))
}
query.Set("with_stats", "true")
query.Set("sort", "asc")
@@ -78,5 +78,5 @@ func CollectApiPipelines(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
index 227d19e93..a4bed6b0d 100644
--- a/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
+++ b/backend/plugins/gitlab/tasks/pipeline_detail_collector.go
@@ -82,7 +82,7 @@ func CollectApiPipelineDetails(taskCtx plugin.SubTaskContext)
errors.Error {
return collectorWithState.Execute()
}
-func GetPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
@@ -93,8 +93,8 @@ func GetPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *hel
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.Since != nil && collectorWithState.IsIncremental {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
*apiCollector.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/gitlab/tasks/shared.go
b/backend/plugins/gitlab/tasks/shared.go
index f20514961..4051c93b5 100644
--- a/backend/plugins/gitlab/tasks/shared.go
+++ b/backend/plugins/gitlab/tasks/shared.go
@@ -168,7 +168,7 @@ func CreateRawDataSubTaskArgs(taskCtx
plugin.SubTaskContext, Table string) (*hel
return rawDataSubTaskArgs, data
}
-func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext,
collectorWithState *helper.ApiCollectorStateManager)
(*helper.DalCursorIterator, errors.Error) {
+func GetMergeRequestsIterator(taskCtx plugin.SubTaskContext, apiCollector
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
@@ -179,9 +179,9 @@ func GetMergeRequestsIterator(taskCtx
plugin.SubTaskContext, collectorWithState
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState != nil {
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *collectorWithState.Since))
+ if apiCollector != nil {
+ if apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at
> ?", *apiCollector.GetSince()))
}
}
// construct the input iterator
diff --git a/backend/plugins/gitlab/tasks/trigger_job_collector.go
b/backend/plugins/gitlab/tasks/trigger_job_collector.go
index 755eeb67a..37ceba215 100644
--- a/backend/plugins/gitlab/tasks/trigger_job_collector.go
+++ b/backend/plugins/gitlab/tasks/trigger_job_collector.go
@@ -44,7 +44,7 @@ var CollectApiTriggerJobsMeta = plugin.SubTaskMeta{
func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TRIGGER_JOB_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -52,17 +52,15 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- iterator, err := GetAllPipelinesIterator(taskCtx, collectorWithState)
+ iterator, err := GetAllPipelinesIterator(taskCtx, apiCollector)
if err != nil {
return err
}
- incremental := collectorWithState.IsIncremental
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
MinTickInterval: &tickInterval,
PageSize: 100,
- Incremental: incremental,
Input: iterator,
UrlTemplate: "projects/{{ .Params.ProjectId }}/pipelines/{{
.Input.GitlabId }}/bridges",
ResponseParser: GetRawMessageFromResponse,
@@ -73,10 +71,10 @@ func CollectApiTriggerJobs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
-func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, collectorWithState
*helper.ApiCollectorStateManager) (*helper.DalCursorIterator, errors.Error) {
+func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext, apiCollector
*helper.StatefulApiCollector) (*helper.DalCursorIterator, errors.Error) {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*GitlabTaskData)
clauses := []dal.Clause{
@@ -87,8 +85,8 @@ func GetAllPipelinesIterator(taskCtx plugin.SubTaskContext,
collectorWithState *
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("gitlab_updated_at > ?",
apiCollector.GetSince()))
}
// construct the input iterator
cursor, err := db.Cursor(clauses...)
diff --git a/backend/plugins/jenkins/tasks/stage_collector.go
b/backend/plugins/jenkins/tasks/stage_collector.go
index 4503d8560..30501065a 100644
--- a/backend/plugins/jenkins/tasks/stage_collector.go
+++ b/backend/plugins/jenkins/tasks/stage_collector.go
@@ -49,7 +49,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*JenkinsTaskData)
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Params: JenkinsApiParams{
ConnectionId: data.Options.ConnectionId,
FullName: data.Options.JobFullName,
@@ -67,8 +67,8 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`tjb.connection_id = ? and tjb.job_path = ? and
tjb.job_name = ? and tjb.class = ?`,
data.Options.ConnectionId, data.Options.JobPath,
data.Options.JobName, "WorkflowRun"),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where(`tjb.start_time >= ?`,
apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -81,7 +81,7 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: fmt.Sprintf("%sjob/%s/{{ .Input.Number
}}/wfapi/describe", data.Options.JobPath, data.Options.JobName),
@@ -109,5 +109,5 @@ func CollectApiStages(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/jira/tasks/development_panel_collector.go
b/backend/plugins/jira/tasks/development_panel_collector.go
index c513c5bf5..1381b5475 100644
--- a/backend/plugins/jira/tasks/development_panel_collector.go
+++ b/backend/plugins/jira/tasks/development_panel_collector.go
@@ -53,7 +53,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
}
db := taskCtx.GetDal()
logger := taskCtx.GetLogger()
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -72,8 +72,8 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -87,7 +87,7 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
// the URL looks like:
@@ -122,5 +122,5 @@ func CollectDevelopmentPanel(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/jira/tasks/epic_collector.go
b/backend/plugins/jira/tasks/epic_collector.go
index 25cbd3998..f7f79ddf2 100644
--- a/backend/plugins/jira/tasks/epic_collector.go
+++ b/backend/plugins/jira/tasks/epic_collector.go
@@ -59,7 +59,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error
{
return err
}
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -78,11 +78,11 @@ func CollectEpics(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Info("got user's timezone: %v", loc.String())
}
jql := "ORDER BY created ASC"
- if collectorWithState.Since != nil {
- jql = "and " + buildJQL(*collectorWithState.Since, loc)
+ if apiCollector.GetSince() != nil {
+ jql = "and " + buildJQL(*apiCollector.GetSince(), loc)
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
Incremental: false,
@@ -123,7 +123,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int)
(api.Iterator, errors.Error) {
diff --git a/backend/plugins/jira/tasks/issue_changelog_collector.go
b/backend/plugins/jira/tasks/issue_changelog_collector.go
index 9f29436c9..76c1eef38 100644
--- a/backend/plugins/jira/tasks/issue_changelog_collector.go
+++ b/backend/plugins/jira/tasks/issue_changelog_collector.go
@@ -53,7 +53,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -71,8 +71,8 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? and i.changelog_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
apiCollector.GetSince()))
}
if logger.IsLevelEnabled(log.LOG_DEBUG) {
@@ -95,7 +95,7 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
}
// now, let ApiCollector takes care the rest
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
GetTotalPages: GetTotalPagesFromResponse,
@@ -125,5 +125,5 @@ func CollectIssueChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/jira/tasks/issue_collector.go
b/backend/plugins/jira/tasks/issue_collector.go
index 3faaa9d51..84ac2d727 100644
--- a/backend/plugins/jira/tasks/issue_collector.go
+++ b/backend/plugins/jira/tasks/issue_collector.go
@@ -48,7 +48,7 @@ var CollectIssuesMeta = plugin.SubTaskMeta{
func CollectIssues(taskCtx plugin.SubTaskContext) errors.Error {
data := taskCtx.GetData().(*JiraTaskData)
logger := taskCtx.GetLogger()
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
/*
This struct will be JSONEncoded and stored into
database along with raw data itself, to identity minimal
@@ -77,11 +77,11 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Info("got user's timezone: %v", loc.String())
}
jql := "ORDER BY created ASC"
- if collectorWithState.Since != nil {
- jql = buildJQL(*collectorWithState.Since, loc)
+ if apiCollector.GetSince() != nil {
+ jql = buildJQL(*apiCollector.GetSince(), loc)
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: data.Options.PageSize,
/*
@@ -143,7 +143,7 @@ func CollectIssues(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
// buildJQL build jql based on timeAfter and incremental mode
diff --git a/backend/plugins/jira/tasks/issue_comment_collector.go
b/backend/plugins/jira/tasks/issue_comment_collector.go
index aa6ca2cdd..5ef2777be 100644
--- a/backend/plugins/jira/tasks/issue_comment_collector.go
+++ b/backend/plugins/jira/tasks/issue_comment_collector.go
@@ -53,7 +53,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -71,8 +71,8 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ? AND
i.std_type != ? AND i.comment_total > 100", data.Options.ConnectionId,
data.Options.BoardId, "Epic"),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
apiCollector.GetSince()))
}
if logger.IsLevelEnabled(log.LOG_DEBUG) {
count, err := db.Count(clauses...)
@@ -94,7 +94,7 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
}
// now, let ApiCollector takes care the rest
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: 100,
GetTotalPages: GetTotalPagesFromResponse,
@@ -124,5 +124,5 @@ func CollectIssueComments(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/jira/tasks/remotelink_collector.go
b/backend/plugins/jira/tasks/remotelink_collector.go
index 2072b1f28..6b44ca7e5 100644
--- a/backend/plugins/jira/tasks/remotelink_collector.go
+++ b/backend/plugins/jira/tasks/remotelink_collector.go
@@ -52,7 +52,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
logger.Info("collect remotelink")
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -70,8 +70,8 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Join("LEFT JOIN _tool_jira_issues i ON (bi.connection_id =
i.connection_id AND bi.issue_id = i.issue_id)"),
dal.Where("bi.connection_id=? and bi.board_id = ?",
data.Options.ConnectionId, data.Options.BoardId),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("i.updated > ?",
collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("i.updated > ?",
apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -85,7 +85,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/remotelink",
@@ -105,7 +105,7 @@ func CollectRemotelinks(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- err = collectorWithState.Execute()
+ err = apiCollector.Execute()
if err != nil {
return err
}
diff --git a/backend/plugins/jira/tasks/worklog_collector.go
b/backend/plugins/jira/tasks/worklog_collector.go
index a22005bc3..8112a6d0a 100644
--- a/backend/plugins/jira/tasks/worklog_collector.go
+++ b/backend/plugins/jira/tasks/worklog_collector.go
@@ -45,7 +45,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger := taskCtx.GetLogger()
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
@@ -66,8 +66,14 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where("i.updated > i.created AND bi.connection_id = ? AND
bi.board_id = ? ", data.Options.ConnectionId, data.Options.BoardId),
dal.Groupby("i.issue_id, i.updated"),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Having("i.updated > ? AND
(i.updated > max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))", collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(
+ clauses,
+ dal.Having(
+ "i.updated > ? AND (i.updated >
max(wl.issue_updated) OR (max(wl.issue_updated) IS NULL AND
COUNT(wl.worklog_id) > 0))",
+ apiCollector.GetSince(),
+ ),
+ )
} else {
/*
i.updated > max(wl.issue_updated) was deleted because
for non-incremental collection,
@@ -89,7 +95,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
Input: iterator,
ApiClient: data.ApiClient,
UrlTemplate: "api/2/issue/{{ .Input.IssueId }}/worklog",
@@ -112,5 +118,5 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
diff --git a/backend/plugins/tapd/tasks/bug_changelog_collector.go
b/backend/plugins/tapd/tasks/bug_changelog_collector.go
index bf768e0d1..0822eeee9 100644
--- a/backend/plugins/tapd/tasks/bug_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/bug_changelog_collector.go
@@ -34,12 +34,12 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_CHANGELOG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect storyChangelogs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bug_changes",
@@ -49,8 +49,8 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created desc")
- if collectorWithState.Since != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("created", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -60,7 +60,7 @@ func CollectBugChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect story changelog error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectBugChangelogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/bug_collector.go
b/backend/plugins/tapd/tasks/bug_collector.go
index 746c1bd89..56479b139 100644
--- a/backend/plugins/tapd/tasks/bug_collector.go
+++ b/backend/plugins/tapd/tasks/bug_collector.go
@@ -34,12 +34,12 @@ func CollectBugs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect bugs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "bugs",
@@ -50,8 +50,8 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -61,7 +61,7 @@ func CollectBugs(taskCtx plugin.SubTaskContext) errors.Error {
logger.Error(err, "collect bug error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectBugMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/bug_commit_collector.go
b/backend/plugins/tapd/tasks/bug_commit_collector.go
index 3164adb05..f3abeafe8 100644
--- a/backend/plugins/tapd/tasks/bug_commit_collector.go
+++ b/backend/plugins/tapd/tasks/bug_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectBugCommits
func CollectBugCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_BUG_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -50,8 +50,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdBug{}),
dal.Where("_tool_tapd_bugs.connection_id = ? and
_tool_tapd_bugs.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ if apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -62,7 +62,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "code_commit_infos",
@@ -87,7 +87,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectBugCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/iteration_collector.go
b/backend/plugins/tapd/tasks/iteration_collector.go
index 5f5512b1f..1fc7b72aa 100644
--- a/backend/plugins/tapd/tasks/iteration_collector.go
+++ b/backend/plugins/tapd/tasks/iteration_collector.go
@@ -36,12 +36,12 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_ITERATION_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect iterations")
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
Concurrency: 3,
@@ -52,8 +52,8 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -69,7 +69,7 @@ func CollectIterations(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect iteration error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectIterationMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_bug_collector.go
b/backend/plugins/tapd/tasks/story_bug_collector.go
index 9d5d0ed0c..315498242 100644
--- a/backend/plugins/tapd/tasks/story_bug_collector.go
+++ b/backend/plugins/tapd/tasks/story_bug_collector.go
@@ -36,7 +36,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryBugs
func CollectStoryBugs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_BUG_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -48,8 +48,8 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -59,7 +59,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "stories/get_related_bugs",
@@ -76,7 +76,7 @@ func CollectStoryBugs(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect storyBug error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectStoryBugMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_changelog_collector.go
b/backend/plugins/tapd/tasks/story_changelog_collector.go
index d0597b7ef..83d195e26 100644
--- a/backend/plugins/tapd/tasks/story_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/story_changelog_collector.go
@@ -49,8 +49,8 @@ func CollectStoryChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if collectorWithState.GetSince() != nil {
+ query.Set("created", fmt.Sprintf(">%s",
collectorWithState.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/story_collector.go
b/backend/plugins/tapd/tasks/story_collector.go
index 095c72b3d..f030abbef 100644
--- a/backend/plugins/tapd/tasks/story_collector.go
+++ b/backend/plugins/tapd/tasks/story_collector.go
@@ -34,12 +34,12 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect stories")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "stories",
@@ -50,8 +50,8 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -61,7 +61,7 @@ func CollectStorys(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect story error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectStoryMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/story_commit_collector.go
b/backend/plugins/tapd/tasks/story_commit_collector.go
index 708c71010..21044f3be 100644
--- a/backend/plugins/tapd/tasks/story_commit_collector.go
+++ b/backend/plugins/tapd/tasks/story_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectStoryCommits
func CollectStoryCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_STORY_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -49,8 +49,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdStory{}),
dal.Where("_tool_tapd_stories.connection_id = ? and
_tool_tapd_stories.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -60,7 +60,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "code_commit_infos",
@@ -85,7 +85,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectStoryCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/task_changelog_collector.go
b/backend/plugins/tapd/tasks/task_changelog_collector.go
index 7aba00c2a..66aa60b9e 100644
--- a/backend/plugins/tapd/tasks/task_changelog_collector.go
+++ b/backend/plugins/tapd/tasks/task_changelog_collector.go
@@ -32,13 +32,13 @@ var _ plugin.SubTaskEntryPoint = CollectTaskChangelogs
func CollectTaskChangelogs(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_CHANGELOG_TABLE)
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
logger := taskCtx.GetLogger()
logger.Info("collect taskChangelogs")
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "task_changes",
@@ -48,8 +48,8 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("created", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("created", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -59,7 +59,7 @@ func CollectTaskChangelogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect task changelog error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectTaskChangelogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/task_collector.go
b/backend/plugins/tapd/tasks/task_collector.go
index 2d5bfdba3..9b5beb3a4 100644
--- a/backend/plugins/tapd/tasks/task_collector.go
+++ b/backend/plugins/tapd/tasks/task_collector.go
@@ -34,7 +34,7 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error
{
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect tasks")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -51,8 +51,8 @@ func CollectTasks(taskCtx plugin.SubTaskContext) errors.Error
{
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("fields", "labels")
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
diff --git a/backend/plugins/tapd/tasks/task_commit_collector.go
b/backend/plugins/tapd/tasks/task_commit_collector.go
index 0ef938997..56c374d3a 100644
--- a/backend/plugins/tapd/tasks/task_commit_collector.go
+++ b/backend/plugins/tapd/tasks/task_commit_collector.go
@@ -38,7 +38,7 @@ var _ plugin.SubTaskEntryPoint = CollectTaskCommits
func CollectTaskCommits(taskCtx plugin.SubTaskContext) errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_TASK_COMMIT_TABLE)
db := taskCtx.GetDal()
- collectorWithState, err :=
api.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := api.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
@@ -49,8 +49,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.From(&models.TapdTask{}),
dal.Where("_tool_tapd_tasks.connection_id = ? and
_tool_tapd_tasks.workspace_id = ? ", data.Options.ConnectionId,
data.Options.WorkspaceId),
}
- if collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("modified > ?",
*collectorWithState.Since))
+ if apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("modified > ?",
*apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -60,7 +60,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
Input: iterator,
UrlTemplate: "code_commit_infos",
@@ -85,7 +85,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect issueCommit error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectTaskCommitMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/tapd/tasks/worklog_collector.go
b/backend/plugins/tapd/tasks/worklog_collector.go
index b091f9e44..c651a2038 100644
--- a/backend/plugins/tapd/tasks/worklog_collector.go
+++ b/backend/plugins/tapd/tasks/worklog_collector.go
@@ -34,12 +34,12 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
rawDataSubTaskArgs, data := CreateRawDataSubTaskArgs(taskCtx,
RAW_WORKLOG_TABLE)
logger := taskCtx.GetLogger()
logger.Info("collect worklogs")
- collectorWithState, err :=
helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
+ apiCollector, err := helper.NewStatefulApiCollector(*rawDataSubTaskArgs)
if err != nil {
return err
}
- err = collectorWithState.InitCollector(helper.ApiCollectorArgs{
+ err = apiCollector.InitCollector(helper.ApiCollectorArgs{
ApiClient: data.ApiClient,
PageSize: int(data.Options.PageSize),
UrlTemplate: "timesheets",
@@ -49,8 +49,8 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
query.Set("page", fmt.Sprintf("%v", reqData.Pager.Page))
query.Set("limit", fmt.Sprintf("%v",
reqData.Pager.Size))
query.Set("order", "created asc")
- if collectorWithState.Since != nil {
- query.Set("modified", fmt.Sprintf(">%s",
collectorWithState.Since.In(data.Options.CstZone).Format("2006-01-02")))
+ if apiCollector.GetSince() != nil {
+ query.Set("modified", fmt.Sprintf(">%s",
apiCollector.GetSince().In(data.Options.CstZone).Format("2006-01-02")))
}
return query, nil
},
@@ -60,7 +60,7 @@ func CollectWorklogs(taskCtx plugin.SubTaskContext)
errors.Error {
logger.Error(err, "collect worklog error")
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
var CollectWorklogMeta = plugin.SubTaskMeta{
diff --git a/backend/plugins/zentao/tasks/bug_commits_collector.go
b/backend/plugins/zentao/tasks/bug_commits_collector.go
index c1ee2f74e..bf2a44116 100644
--- a/backend/plugins/zentao/tasks/bug_commits_collector.go
+++ b/backend/plugins/zentao/tasks/bug_commits_collector.go
@@ -52,7 +52,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data := taskCtx.GetData().(*ZentaoTaskData)
// state manager
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
Table: RAW_BUG_COMMITS_TABLE,
@@ -70,8 +70,8 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -82,7 +82,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
// collect bug commits
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
@@ -111,7 +111,7 @@ func CollectBugCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
type SimpleZentaoBug struct {
diff --git a/backend/plugins/zentao/tasks/story_commits_collector.go
b/backend/plugins/zentao/tasks/story_commits_collector.go
index 489ad2945..85404a39f 100644
--- a/backend/plugins/zentao/tasks/story_commits_collector.go
+++ b/backend/plugins/zentao/tasks/story_commits_collector.go
@@ -47,7 +47,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data := taskCtx.GetData().(*ZentaoTaskData)
// state manager
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
Table: RAW_STORY_COMMITS_TABLE,
@@ -66,8 +66,8 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
dal.Where(`_tool_zentao_project_stories.project_id = ? and
_tool_zentao_project_stories.connection_id = ?`,
data.Options.ProjectId, data.Options.ConnectionId),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -80,7 +80,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
// collect story commits
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
@@ -109,7 +109,7 @@ func CollectStoryCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}
type inputWithLastEditedDate struct {
diff --git a/backend/plugins/zentao/tasks/task_commits_collector.go
b/backend/plugins/zentao/tasks/task_commits_collector.go
index 5a3204302..219a500fd 100644
--- a/backend/plugins/zentao/tasks/task_commits_collector.go
+++ b/backend/plugins/zentao/tasks/task_commits_collector.go
@@ -46,7 +46,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data := taskCtx.GetData().(*ZentaoTaskData)
// state manager
- collectorWithState, err :=
api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
+ apiCollector, err := api.NewStatefulApiCollector(api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
Table: RAW_TASK_COMMITS_TABLE,
@@ -64,8 +64,8 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
data.Options.ProjectId, data.Options.ConnectionId,
),
}
- if collectorWithState.IsIncremental && collectorWithState.Since != nil {
- clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", collectorWithState.Since))
+ if apiCollector.IsIncremental() && apiCollector.GetSince() != nil {
+ clauses = append(clauses, dal.Where("last_edited_date is not
null and last_edited_date > ?", apiCollector.GetSince()))
}
cursor, err := db.Cursor(clauses...)
if err != nil {
@@ -78,7 +78,7 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
}
// collect task commits
- err = collectorWithState.InitCollector(api.ApiCollectorArgs{
+ err = apiCollector.InitCollector(api.ApiCollectorArgs{
RawDataSubTaskArgs: api.RawDataSubTaskArgs{
Ctx: taskCtx,
Options: data.Options,
@@ -107,5 +107,5 @@ func CollectTaskCommits(taskCtx plugin.SubTaskContext)
errors.Error {
return err
}
- return collectorWithState.Execute()
+ return apiCollector.Execute()
}