This is an automated email from the ASF dual-hosted git repository.
lynwee pushed a commit to branch release-v1.0
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v1.0 by this push:
new c25114ed9 feat: collect Zentao issue-repo-commit from DB (#8185)
(#8211)
c25114ed9 is described below
commit c25114ed936ffcc87cf460da436c05c4f5f2e0d1
Author: Chaojie Yan <[email protected]>
AuthorDate: Fri Nov 22 16:22:26 2024 +0800
feat: collect Zentao issue-repo-commit from DB (#8185) (#8211)
* feat: collect Zentao Story-Repo-Commit from DB
* feat: collect Zentao bug and task related repo commits from DB
---
backend/plugins/zentao/impl/impl.go | 9 +-
.../plugins/zentao/tasks/bug_repo_commits_dbget.go | 187 ++++++++++++++++++++
.../zentao/tasks/story_repo_commits_dbget.go | 192 +++++++++++++++++++++
.../zentao/tasks/task_repo_commits_dbget.go | 187 ++++++++++++++++++++
4 files changed, 569 insertions(+), 6 deletions(-)
diff --git a/backend/plugins/zentao/impl/impl.go
b/backend/plugins/zentao/impl/impl.go
index e46f8a107..2c24a235d 100644
--- a/backend/plugins/zentao/impl/impl.go
+++ b/backend/plugins/zentao/impl/impl.go
@@ -135,8 +135,7 @@ func (p Zentao) SubTaskMetas() []plugin.SubTaskMeta {
tasks.CollectTaskCommitsMeta,
tasks.ExtractTaskCommitsMeta,
- tasks.CollectTaskRepoCommitsMeta,
- tasks.ExtractTaskRepoCommitsMeta,
+ tasks.DBGetTaskRepoCommitsMeta,
tasks.ConvertTaskRepoCommitsMeta,
// product
@@ -151,14 +150,12 @@ func (p Zentao) SubTaskMetas() []plugin.SubTaskMeta {
tasks.CollectStoryCommitsMeta,
tasks.ExtractStoryCommitsMeta,
- tasks.CollectStoryRepoCommitsMeta,
- tasks.ExtractStoryRepoCommitsMeta,
+ tasks.DBGetStoryRepoCommitsMeta,
tasks.ConvertStoryRepoCommitsMeta,
tasks.CollectBugCommitsMeta,
tasks.ExtractBugCommitsMeta,
- tasks.CollectBugRepoCommitsMeta,
- tasks.ExtractBugRepoCommitsMeta,
+ tasks.DBGetBugRepoCommitsMeta,
tasks.ConvertBugRepoCommitsMeta,
tasks.DBGetChangelogMeta,
diff --git a/backend/plugins/zentao/tasks/bug_repo_commits_dbget.go
b/backend/plugins/zentao/tasks/bug_repo_commits_dbget.go
new file mode 100644
index 000000000..ea68f4221
--- /dev/null
+++ b/backend/plugins/zentao/tasks/bug_repo_commits_dbget.go
@@ -0,0 +1,187 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "reflect"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/zentao/models"
+)
+
+var _ plugin.SubTaskEntryPoint = DBGetBugRepoCommits
+
+var DBGetBugRepoCommitsMeta = plugin.SubTaskMeta{
+ Name: "collectBugRepoCommits",
+ EntryPoint: DBGetBugRepoCommits,
+ EnabledByDefault: true,
+ Description: "Get bug commits data from Zentao database",
+ DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET},
+}
+
+func DBGetBugRepoCommits(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ // skip if no RemoteDb
+ if data.RemoteDb == nil {
+ return nil
+ }
+
+ divider := api.NewBatchSaveDivider(taskCtx, 500, "", "")
+ defer func() {
+ err1 := divider.Close()
+ if err1 != nil {
+ panic(err1)
+ }
+ }()
+ handler, err := newBugRepoCommitHandler(taskCtx, divider)
+ if err != nil {
+ return err
+ }
+ return handler.collectBugRepoCommit(
+ taskCtx.GetDal(),
+ data.RemoteDb,
+ data.Options.ProjectId,
+ data.Options.ConnectionId,
+ )
+}
+
+type bugRepoCommitHandler struct {
+ rawDataParams string
+ bugRepoCommitBachSave *api.BatchSave
+}
+
+type toolZentaoBug struct {
+ BugID int64 `gorm:"type:integer"`
+}
+
+type RemoteBugRepoCommit struct {
+ Project int64 `gorm:"type:integer"`
+ IssueID int64 `gorm:"type:integer"`
+ RepoUrl string
+ CommitSha string
+}
+
+func (h bugRepoCommitHandler) collectBugRepoCommit(db dal.Dal, rdb dal.Dal,
projectId int64, connectionId uint64) errors.Error {
+ bugCursor, err := db.RawCursor(`
+ SELECT
+ DISTINCT id AS bug_id
+ FROM
+ _tool_zentao_bugs AS fact_bug
+ WHERE
+ fact_bug.project = ? AND
+ fact_bug.connection_id = ?
+ `, projectId, connectionId)
+ if err != nil {
+ return err
+ }
+ defer bugCursor.Close()
+ var bugIds []int64
+ for bugCursor.Next() {
+ var row toolZentaoBug
+ err := db.Fetch(bugCursor, &row)
+ if err != nil {
+ return errors.Default.Wrap(err, "error fetching
bugCursor")
+ }
+ bugIds = append(bugIds, row.BugID)
+ }
+
+ remoteCursor, err := rdb.RawCursor(`
+ SELECT
+ DISTINCT dim_bug_commits.bug_id AS issue_id,
+ dim_repo.path AS repo_url,
+ dim_revision.revision AS commit_sha
+ FROM (
+ SELECT
+ fact_action.objectID AS bug_id,
+ fact_action.project AS project_id,
+ fact_action.product AS product_id,
+ fact_action.extra AS short_commit_hexsha
+ FROM
+ zt_action AS fact_action
+ WHERE
+ fact_action.objectType IN ('bug') AND
+ fact_action.objectID IN ? AND
+ fact_action.action IN ('gitcommited')
+ ) AS dim_bug_commits
+ INNER JOIN (
+ SELECT
+ fact_repo_hist.repo AS repo_id,
+ fact_repo_hist.revision AS revision,
+ LEFT(fact_repo_hist.revision, 10) AS
short_commit_hexsha
+ FROM
+ zt_repohistory AS fact_repo_hist
+ ) AS dim_revision
+ ON
+ dim_bug_commits.short_commit_hexsha =
dim_revision.short_commit_hexsha
+ INNER JOIN
+ zt_repo AS dim_repo
+ ON
+ dim_revision.repo_id = dim_repo.id
+ `, bugIds)
+ if err != nil {
+ return err
+ }
+ defer remoteCursor.Close()
+
+ for remoteCursor.Next() {
+ var remoteBugRepoCommit RemoteBugRepoCommit
+ err = rdb.Fetch(remoteCursor, &remoteBugRepoCommit)
+ if err != nil {
+ return err
+ }
+ bugRepoCommit := &models.ZentaoBugRepoCommit{
+ ConnectionId: connectionId,
+ Project: projectId,
+ RepoUrl: remoteBugRepoCommit.RepoUrl,
+ CommitSha: remoteBugRepoCommit.CommitSha,
+ IssueId:
strconv.FormatInt(remoteBugRepoCommit.IssueID, 10),
+ }
+ bugRepoCommit.NoPKModel.RawDataParams = h.rawDataParams
+ err = h.bugRepoCommitBachSave.Add(bugRepoCommit)
+ if err != nil {
+ return err
+ }
+ }
+ return h.bugRepoCommitBachSave.Flush()
+}
+
+func newBugRepoCommitHandler(taskCtx plugin.SubTaskContext, divider
*api.BatchSaveDivider) (*bugRepoCommitHandler, errors.Error) {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ bugRepoCommitBachSave, err :=
divider.ForType(reflect.TypeOf(&models.ZentaoBugRepoCommit{}))
+ if err != nil {
+ return nil, err
+ }
+ blob, _ := json.Marshal(data.Options.GetParams())
+ rawDataParams := string(blob)
+ db := taskCtx.GetDal()
+ err = db.Delete(&models.ZentaoBugRepoCommit{},
dal.Where("_raw_data_params = ?", rawDataParams))
+ if err != nil {
+ return nil, err
+ }
+ return &bugRepoCommitHandler{
+ rawDataParams: rawDataParams,
+ bugRepoCommitBachSave: bugRepoCommitBachSave,
+ }, nil
+}
diff --git a/backend/plugins/zentao/tasks/story_repo_commits_dbget.go
b/backend/plugins/zentao/tasks/story_repo_commits_dbget.go
new file mode 100644
index 000000000..a3ee1120c
--- /dev/null
+++ b/backend/plugins/zentao/tasks/story_repo_commits_dbget.go
@@ -0,0 +1,192 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "reflect"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/zentao/models"
+)
+
+var _ plugin.SubTaskEntryPoint = DBGetStoryRepoCommits
+
+var DBGetStoryRepoCommitsMeta = plugin.SubTaskMeta{
+ Name: "collectStoryRepoCommits",
+ EntryPoint: DBGetStoryRepoCommits,
+ EnabledByDefault: true,
+ Description: "Get story commits data from Zentao database",
+ DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET},
+}
+
+func DBGetStoryRepoCommits(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ // skip if no RemoteDb
+ if data.RemoteDb == nil {
+ return nil
+ }
+
+ divider := api.NewBatchSaveDivider(taskCtx, 500, "", "")
+ defer func() {
+ err1 := divider.Close()
+ if err1 != nil {
+ panic(err1)
+ }
+ }()
+ handler, err := newStoryRepoCommitHandler(taskCtx, divider)
+ if err != nil {
+ return err
+ }
+ return handler.collectStoryRepoCommit(
+ taskCtx.GetDal(),
+ data.RemoteDb,
+ data.Options.ProjectId,
+ data.Options.ConnectionId,
+ )
+}
+
+type storyRepoCommitHandler struct {
+ rawDataParams string
+ storyRepoCommitBachSave *api.BatchSave
+}
+
+type toolZentaoStory struct {
+ StoryID int64 `gorm:"type:integer"`
+}
+
+type RemoteStoryRepoCommit struct {
+ Project int64 `gorm:"type:integer"`
+ IssueID int64 `gorm:"type:integer"`
+ RepoUrl string
+ CommitSha string
+}
+
+func (h storyRepoCommitHandler) collectStoryRepoCommit(db dal.Dal, rdb
dal.Dal, projectId int64, connectionId uint64) errors.Error {
+ storyCursor, err := db.RawCursor(`
+ SELECT
+ DISTINCT fact_project_story.story_id AS story_id
+ FROM
+ _tool_zentao_project_stories AS fact_project_story
+ LEFT JOIN
+ _tool_zentao_stories AS fact_story
+ ON
+ fact_project_story.story_id = fact_story.id AND
+ fact_project_story.connection_id =
fact_story.connection_id
+ WHERE
+ fact_project_story.project_id = ? AND
+ fact_project_story.connection_id = ?
+ `, projectId, connectionId)
+ if err != nil {
+ return err
+ }
+ defer storyCursor.Close()
+ var storyIds []int64
+ for storyCursor.Next() {
+ var row toolZentaoStory
+ err := db.Fetch(storyCursor, &row)
+ if err != nil {
+ return errors.Default.Wrap(err, "error fetching
storyCursor")
+ }
+ storyIds = append(storyIds, row.StoryID)
+ }
+
+ remoteCursor, err := rdb.RawCursor(`
+ SELECT
+ DISTINCT dim_story_commits.story_id AS issue_id,
+ dim_repo.path AS repo_url,
+ dim_revision.revision AS commit_sha
+ FROM (
+ SELECT
+ fact_action.objectID AS story_id,
+ fact_action.project AS project_id,
+ fact_action.product AS product_id,
+ fact_action.extra AS short_commit_hexsha
+ FROM
+ zt_action AS fact_action
+ WHERE
+ fact_action.objectType IN ('story',
'requirement') AND
+ fact_action.objectID IN ? AND
+ fact_action.action IN ('gitcommited')
+ ) AS dim_story_commits
+ INNER JOIN (
+ SELECT
+ fact_repo_hist.repo AS repo_id,
+ fact_repo_hist.revision AS revision,
+ LEFT(fact_repo_hist.revision, 10) AS
short_commit_hexsha
+ FROM
+ zt_repohistory AS fact_repo_hist
+ ) AS dim_revision
+ ON
+ dim_story_commits.short_commit_hexsha =
dim_revision.short_commit_hexsha
+ INNER JOIN
+ zt_repo AS dim_repo
+ ON
+ dim_revision.repo_id = dim_repo.id
+ `, storyIds)
+ if err != nil {
+ return err
+ }
+ defer remoteCursor.Close()
+
+ for remoteCursor.Next() {
+ var remoteStoryRepoCommit RemoteStoryRepoCommit
+ err = rdb.Fetch(remoteCursor, &remoteStoryRepoCommit)
+ if err != nil {
+ return err
+ }
+ storyRepoCommit := &models.ZentaoStoryRepoCommit{
+ ConnectionId: connectionId,
+ Project: projectId,
+ RepoUrl: remoteStoryRepoCommit.RepoUrl,
+ CommitSha: remoteStoryRepoCommit.CommitSha,
+ IssueId:
strconv.FormatInt(remoteStoryRepoCommit.IssueID, 10),
+ }
+ storyRepoCommit.NoPKModel.RawDataParams = h.rawDataParams
+ err = h.storyRepoCommitBachSave.Add(storyRepoCommit)
+ if err != nil {
+ return err
+ }
+ }
+ return h.storyRepoCommitBachSave.Flush()
+}
+
+func newStoryRepoCommitHandler(taskCtx plugin.SubTaskContext, divider
*api.BatchSaveDivider) (*storyRepoCommitHandler, errors.Error) {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ storyRepoCommitBachSave, err :=
divider.ForType(reflect.TypeOf(&models.ZentaoStoryRepoCommit{}))
+ if err != nil {
+ return nil, err
+ }
+ blob, _ := json.Marshal(data.Options.GetParams())
+ rawDataParams := string(blob)
+ db := taskCtx.GetDal()
+ err = db.Delete(&models.ZentaoStoryRepoCommit{},
dal.Where("_raw_data_params = ?", rawDataParams))
+ if err != nil {
+ return nil, err
+ }
+ return &storyRepoCommitHandler{
+ rawDataParams: rawDataParams,
+ storyRepoCommitBachSave: storyRepoCommitBachSave,
+ }, nil
+}
diff --git a/backend/plugins/zentao/tasks/task_repo_commits_dbget.go
b/backend/plugins/zentao/tasks/task_repo_commits_dbget.go
new file mode 100644
index 000000000..630d2476e
--- /dev/null
+++ b/backend/plugins/zentao/tasks/task_repo_commits_dbget.go
@@ -0,0 +1,187 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package tasks
+
+import (
+ "encoding/json"
+ "reflect"
+ "strconv"
+
+ "github.com/apache/incubator-devlake/core/dal"
+ "github.com/apache/incubator-devlake/core/errors"
+ "github.com/apache/incubator-devlake/core/plugin"
+ "github.com/apache/incubator-devlake/helpers/pluginhelper/api"
+ "github.com/apache/incubator-devlake/plugins/zentao/models"
+)
+
+var _ plugin.SubTaskEntryPoint = DBGetTaskRepoCommits
+
+var DBGetTaskRepoCommitsMeta = plugin.SubTaskMeta{
+ Name: "collectTaskRepoCommits",
+ EntryPoint: DBGetTaskRepoCommits,
+ EnabledByDefault: true,
+ Description: "Get task commits data from Zentao database",
+ DomainTypes: []string{plugin.DOMAIN_TYPE_TICKET},
+}
+
+func DBGetTaskRepoCommits(taskCtx plugin.SubTaskContext) errors.Error {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ // skip if no RemoteDb
+ if data.RemoteDb == nil {
+ return nil
+ }
+
+ divider := api.NewBatchSaveDivider(taskCtx, 500, "", "")
+ defer func() {
+ err1 := divider.Close()
+ if err1 != nil {
+ panic(err1)
+ }
+ }()
+ handler, err := newTaskRepoCommitHandler(taskCtx, divider)
+ if err != nil {
+ return err
+ }
+ return handler.collectTaskRepoCommit(
+ taskCtx.GetDal(),
+ data.RemoteDb,
+ data.Options.ProjectId,
+ data.Options.ConnectionId,
+ )
+}
+
+type taskRepoCommitHandler struct {
+ rawDataParams string
+ taskRepoCommitBachSave *api.BatchSave
+}
+
+type toolZentaoTask struct {
+ TaskID int64 `gorm:"type:integer"`
+}
+
+type RemoteTaskRepoCommit struct {
+ Project int64 `gorm:"type:integer"`
+ IssueID int64 `gorm:"type:integer"`
+ RepoUrl string
+ CommitSha string
+}
+
+func (h taskRepoCommitHandler) collectTaskRepoCommit(db dal.Dal, rdb dal.Dal,
projectId int64, connectionId uint64) errors.Error {
+ taskCursor, err := db.RawCursor(`
+ SELECT
+ DISTINCT id AS task_id
+ FROM
+ _tool_zentao_tasks AS fact_task
+ WHERE
+ fact_task.project = ? AND
+ fact_task.connection_id = ?
+ `, projectId, connectionId)
+ if err != nil {
+ return err
+ }
+ defer taskCursor.Close()
+ var taskIds []int64
+ for taskCursor.Next() {
+ var row toolZentaoTask
+ err := db.Fetch(taskCursor, &row)
+ if err != nil {
+ return errors.Default.Wrap(err, "error fetching
taskCursor")
+ }
+ taskIds = append(taskIds, row.TaskID)
+ }
+
+ remoteCursor, err := rdb.RawCursor(`
+ SELECT
+ DISTINCT dim_task_commits.task_id AS issue_id,
+ dim_repo.path AS repo_url,
+ dim_revision.revision AS commit_sha
+ FROM (
+ SELECT
+ fact_action.objectID AS task_id,
+ fact_action.project AS project_id,
+ fact_action.product AS product_id,
+ fact_action.extra AS short_commit_hexsha
+ FROM
+ zt_action AS fact_action
+ WHERE
+ fact_action.objectType IN ('task') AND
+ fact_action.objectID IN ? AND
+ fact_action.action IN ('gitcommited')
+ ) AS dim_task_commits
+ INNER JOIN (
+ SELECT
+ fact_repo_hist.repo AS repo_id,
+ fact_repo_hist.revision AS revision,
+ LEFT(fact_repo_hist.revision, 10) AS
short_commit_hexsha
+ FROM
+ zt_repohistory AS fact_repo_hist
+ ) AS dim_revision
+ ON
+ dim_task_commits.short_commit_hexsha =
dim_revision.short_commit_hexsha
+ INNER JOIN
+ zt_repo AS dim_repo
+ ON
+ dim_revision.repo_id = dim_repo.id
+ `, taskIds)
+ if err != nil {
+ return err
+ }
+ defer remoteCursor.Close()
+
+ for remoteCursor.Next() {
+ var remoteTaskRepoCommit RemoteTaskRepoCommit
+ err = rdb.Fetch(remoteCursor, &remoteTaskRepoCommit)
+ if err != nil {
+ return err
+ }
+ taskRepoCommit := &models.ZentaoTaskRepoCommit{
+ ConnectionId: connectionId,
+ Project: projectId,
+ RepoUrl: remoteTaskRepoCommit.RepoUrl,
+ CommitSha: remoteTaskRepoCommit.CommitSha,
+ IssueId:
strconv.FormatInt(remoteTaskRepoCommit.IssueID, 10),
+ }
+ taskRepoCommit.NoPKModel.RawDataParams = h.rawDataParams
+ err = h.taskRepoCommitBachSave.Add(taskRepoCommit)
+ if err != nil {
+ return err
+ }
+ }
+ return h.taskRepoCommitBachSave.Flush()
+}
+
+func newTaskRepoCommitHandler(taskCtx plugin.SubTaskContext, divider
*api.BatchSaveDivider) (*taskRepoCommitHandler, errors.Error) {
+ data := taskCtx.GetData().(*ZentaoTaskData)
+
+ taskRepoCommitBachSave, err :=
divider.ForType(reflect.TypeOf(&models.ZentaoTaskRepoCommit{}))
+ if err != nil {
+ return nil, err
+ }
+ blob, _ := json.Marshal(data.Options.GetParams())
+ rawDataParams := string(blob)
+ db := taskCtx.GetDal()
+ err = db.Delete(&models.ZentaoTaskRepoCommit{},
dal.Where("_raw_data_params = ?", rawDataParams))
+ if err != nil {
+ return nil, err
+ }
+ return &taskRepoCommitHandler{
+ rawDataParams: rawDataParams,
+ taskRepoCommitBachSave: taskRepoCommitBachSave,
+ }, nil
+}