This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new ac3b1d9cb feat: jira epic collection / extraction support incremental
mode for better performance (#8414)
ac3b1d9cb is described below
commit ac3b1d9cb317e6a08b1412ddef0de7bbb9b97c9e
Author: Klesh Wong <[email protected]>
AuthorDate: Tue Apr 29 16:04:12 2025 +0800
feat: jira epic collection / extraction support incremental mode for better
performance (#8414)
---
backend/core/config/config_viper.go | 1 +
backend/plugins/jira/tasks/epic_collector.go | 9 ++++--
backend/plugins/jira/tasks/epic_extractor.go | 47 +++++++++++++++++++---------
backend/server/api/api.go | 1 +
backend/server/services/pipeline.go | 7 +++--
5 files changed, 45 insertions(+), 20 deletions(-)
diff --git a/backend/core/config/config_viper.go
b/backend/core/config/config_viper.go
index 16060c46b..571439e47 100644
--- a/backend/core/config/config_viper.go
+++ b/backend/core/config/config_viper.go
@@ -106,6 +106,7 @@ func setDefaultValue(v *viper.Viper) {
v.SetDefault("SWAGGER_DOCS_DIR", "resources/swagger")
v.SetDefault("RESUME_PIPELINES", true)
v.SetDefault("CORS_ALLOW_ORIGIN", "*")
+ v.SetDefault("CONSUME_PIPELINES", true)
}
func init() {
diff --git a/backend/plugins/jira/tasks/epic_collector.go
b/backend/plugins/jira/tasks/epic_collector.go
index 959b0e3c8..dcb163309 100644
--- a/backend/plugins/jira/tasks/epic_collector.go
+++ b/backend/plugins/jira/tasks/epic_collector.go
@@ -78,6 +78,9 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error
{
logger.Info("got user's timezone: %v", loc.String())
}
jql := "ORDER BY created ASC"
+ if apiCollector.GetSince() != nil {
+ jql = buildJQL(*apiCollector.GetSince(), loc)
+ }
err = apiCollector.InitCollector(api.ApiCollectorArgs{
ApiClient: data.ApiClient,
@@ -90,7 +93,7 @@ func CollectEpics(taskCtx plugin.SubTaskContext) errors.Error
{
for _, e := range reqData.Input.([]interface{}) {
epicKeys = append(epicKeys, *e.(*string))
}
- localJQL := fmt.Sprintf("issue in (%s) %s",
strings.Join(epicKeys, ","), jql)
+ localJQL := fmt.Sprintf("issue in (%s) and %s",
strings.Join(epicKeys, ","), jql)
query.Set("jql", localJQL)
query.Set("startAt", fmt.Sprintf("%v",
reqData.Pager.Skip))
query.Set("maxResults", fmt.Sprintf("%v",
reqData.Pager.Size))
@@ -130,12 +133,12 @@ func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData,
batchSize int) (api.Ite
dal.Join(`
LEFT JOIN _tool_jira_board_issues bi ON (
i.connection_id = bi.connection_id
- AND
+ AND
i.issue_id = bi.issue_id
)`),
dal.Where(`
i.connection_id = ?
- AND
+ AND
bi.board_id = ?
AND
i.epic_key != ''
diff --git a/backend/plugins/jira/tasks/epic_extractor.go
b/backend/plugins/jira/tasks/epic_extractor.go
index 89f2ef70c..fa7a4d741 100644
--- a/backend/plugins/jira/tasks/epic_extractor.go
+++ b/backend/plugins/jira/tasks/epic_extractor.go
@@ -18,8 +18,6 @@ limitations under the License.
package tasks
import (
- "encoding/json"
-
"github.com/apache/incubator-devlake/core/dal"
"github.com/apache/incubator-devlake/core/errors"
"github.com/apache/incubator-devlake/core/log"
@@ -39,12 +37,12 @@ var ExtractEpicsMeta = plugin.SubTaskMeta{
DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET,
plugin.DOMAIN_TYPE_CROSS},
}
-func ExtractEpics(taskCtx plugin.SubTaskContext) errors.Error {
- data := taskCtx.GetData().(*JiraTaskData)
- db := taskCtx.GetDal()
+func ExtractEpics(subtaskCtx plugin.SubTaskContext) errors.Error {
+ data := subtaskCtx.GetData().(*JiraTaskData)
+ db := subtaskCtx.GetDal()
connectionId := data.Options.ConnectionId
boardId := data.Options.BoardId
- logger := taskCtx.GetLogger()
+ logger := subtaskCtx.GetLogger()
logger.Info("extract external epic Issues, connection_id=%d,
board_id=%d", connectionId, boardId)
mappings, err := getTypeMappings(data, db)
if err != nil {
@@ -54,21 +52,40 @@ func ExtractEpics(taskCtx plugin.SubTaskContext)
errors.Error {
if err != nil {
return err
}
- extractor, err := api.NewApiExtractor(api.ApiExtractorArgs{
- RawDataSubTaskArgs: api.RawDataSubTaskArgs{
- Ctx: taskCtx,
+
+ extractor, err :=
api.NewStatefulApiExtractor(&api.StatefulApiExtractorArgs[apiv2models.Issue]{
+ SubtaskCommonArgs: &api.SubtaskCommonArgs{
+ SubTaskContext: subtaskCtx,
+ Table: RAW_EPIC_TABLE,
Params: JiraApiParams{
ConnectionId: data.Options.ConnectionId,
BoardId: data.Options.BoardId,
},
- Table: RAW_EPIC_TABLE,
+ SubtaskConfig: map[string]any{
+ "typeMappings": mappings,
+ "storyPointField":
data.Options.ScopeConfig.StoryPointField,
+ },
},
- Extract: func(row *api.RawData) ([]interface{}, errors.Error) {
- apiIssue := &apiv2models.Issue{}
- err = errors.Convert(json.Unmarshal(row.Data, apiIssue))
- if err != nil {
- return nil, err
+ BeforeExtract: func(apiIssue *apiv2models.Issue, stateManager
*api.SubtaskStateManager) errors.Error {
+ if stateManager.IsIncremental() {
+ err := db.Delete(
+ &models.JiraIssueLabel{},
+ dal.Where("connection_id = ? AND
issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
+ )
+ if err != nil {
+ return err
+ }
+ err = db.Delete(
+ &models.JiraIssueRelationship{},
+ dal.Where("connection_id = ? AND
issue_id = ?", data.Options.ConnectionId, apiIssue.ID),
+ )
+ if err != nil {
+ return err
+ }
}
+ return nil
+ },
+ Extract: func(apiIssue *apiv2models.Issue, row *api.RawData)
([]interface{}, errors.Error) {
return extractIssues(data, mappings, apiIssue, row,
userFieldMap)
},
})
diff --git a/backend/server/api/api.go b/backend/server/api/api.go
index a2b590be4..a407cdce4 100644
--- a/backend/server/api/api.go
+++ b/backend/server/api/api.go
@@ -118,6 +118,7 @@ func SetupApiServer(router *gin.Engine) {
// Required for `/projects/hello%20%2F%20world` to be parsed properly
with `/projects/:projectName`
// end up with `name = "hello / world"`
router.UseRawPath = true
+ // router.UnescapePathValues = false
// Endpoint to proceed database migration
router.GET("/proceed-db-migration", func(ctx *gin.Context) {
diff --git a/backend/server/services/pipeline.go
b/backend/server/services/pipeline.go
index f0c7f3133..41e8d8c28 100644
--- a/backend/server/services/pipeline.go
+++ b/backend/server/services/pipeline.go
@@ -20,7 +20,6 @@ package services
import (
"context"
"fmt"
- "golang.org/x/sync/errgroup"
"net/url"
"os"
"path/filepath"
@@ -28,6 +27,8 @@ import (
"sync"
"time"
+ "golang.org/x/sync/errgroup"
+
"github.com/spf13/cast"
"github.com/apache/incubator-devlake/core/dal"
@@ -107,7 +108,9 @@ func pipelineServiceInit() {
pipelineMaxParallel = 10000
}
// run pipeline with independent goroutine
- go RunPipelineInQueue(pipelineMaxParallel)
+ if cfg.GetBool("CONSUME_PIPELINES") {
+ go RunPipelineInQueue(pipelineMaxParallel)
+ }
}
func markInterruptedPipelineAs(status string) {