This is an automated email from the ASF dual-hosted git repository.
klesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new 078af650 refactor: refactor migrationscripts (#3473)
078af650 is described below
commit 078af650db51b00a3aa7c9a04582eaed417ab6a0
Author: mappjzc <[email protected]>
AuthorDate: Wed Oct 26 14:21:50 2022 +0800
refactor: refactor migrationscripts (#3473)
* refactor: refactor migrationscripts
refactor migrationscripts for gitlab
Add AddTablerColumn and DropTablerColumn for dal
Nddtfjiang <[email protected]>
* fix: fix for review
Add UpdateColumns
Nddtfjiang <[email protected]>
* refactor: add change columns type one by on
Add ChangeColumnsTypeOneByOne.
Nddtfjiang <[email protected]>
* refactor: merge auto migrate table
merged AutoMigrateTables and AutoMigrate
Nddtfjiang <[email protected]>
* refactor: for review
for review to changed function name and note
Nddtfjiang <[email protected]>
* refactor: refactor dal
move UpdateColumns to UpdateAllColumn
add UpdateColumns for modify the specified group column
remove RawCursor for Klesh mind.
Nddtfjiang <[email protected]>
* fix: fix unchanged file lint error
fix a Unchanged files with check annotations.
Nddtfjiang <[email protected]>
---
helpers/migrationhelper/migrationhelper.go | 132 +++++++++++++++++++++
impl/dalgorm/dalgorm.go | 74 +++++++++---
.../migrationscripts/20220903_encrypt_blueprint.go | 8 +-
.../migrationscripts/20220904_encrypt_pipeline.go | 8 +-
.../20220905_modfiy_cicd_pipeline.go | 6 +-
.../migrationscripts/20220908_modfiy_cicd_tasks.go | 6 +-
.../20220913_add_origin_value_for_pr.go | 6 +-
.../20220915_rename_pipeline_commits.go | 17 +--
.../20220929_change_leadtimeminutes_to_int64.go | 52 +++-----
plugins/azure/api/blueprint.go | 5 +-
plugins/core/dal/dal.go | 38 ++++--
plugins/gitlab/impl/impl.go | 6 +-
.../migrationscripts/20220714_add_init_tables.go | 41 ++++---
...odify_gilab_ci.go => 20220729_add_gitlab_ci.go} | 38 +++---
.../migrationscripts/20220804_add_pipeline_id.go | 17 +--
.../20220906_fix_duration_to_float8.go | 66 ++++-------
.../20220907_add_pipeline_projects_tables.go | 14 +--
plugins/gitlab/models/migrationscripts/register.go | 8 +-
plugins/jira/tasks/epic_collector.go | 41 ++++---
.../tasks/refs_pr_cherry_pick_calculator.go | 43 +++----
plugins/starrocks/tasks.go | 12 +-
21 files changed, 406 insertions(+), 232 deletions(-)
diff --git a/helpers/migrationhelper/migrationhelper.go
b/helpers/migrationhelper/migrationhelper.go
index f8f67acb..d02650b1 100644
--- a/helpers/migrationhelper/migrationhelper.go
+++ b/helpers/migrationhelper/migrationhelper.go
@@ -42,6 +42,138 @@ func AutoMigrateTables(basicRes core.BasicRes, dst
...interface{}) errors.Error
return nil
}
+// ChangeColumnsType change the type of specified columns for the table
+func ChangeColumnsType[D any](
+ basicRes core.BasicRes,
+ script core.MigrationScript,
+ tableName string,
+ columns []string,
+ update func(tmpColumnParams []interface{}) errors.Error,
+) (err errors.Error) {
+ db := basicRes.GetDal()
+ tmpColumnsNames := make([]string, len(columns))
+ for i, v := range columns {
+ tmpColumnsNames[i] = fmt.Sprintf("%s_%s", v, hashScript(script))
+ err = db.RenameColumn(tableName, v, tmpColumnsNames[i])
+ if err != nil {
+ return err
+ }
+
+ defer func(tmpColumnName string, ColumnsName string) {
+ if err != nil {
+ err1 := db.RenameColumn(tableName,
tmpColumnName, ColumnsName)
+ if err1 != nil {
+ err = errors.Default.Wrap(err,
fmt.Sprintf("RollBack by RenameColum failed.Relevant data needs to be repaired
manually.%s", err1.Error()))
+ }
+ }
+ }(tmpColumnsNames[i], v)
+ }
+
+ err = db.AutoMigrate(new(D), dal.From(tableName))
+ if err != nil {
+ return errors.Default.Wrap(err, "AutoMigrate for Add Colume
Error")
+ }
+
+ defer func() {
+ if err != nil {
+ err1 := db.DropColumns(tableName, columns...)
+ if err1 != nil {
+ err = errors.Default.Wrap(err,
fmt.Sprintf("RollBack by DropColume failed.Relevant data needs to be repaired
manually.%s", err1.Error()))
+ }
+ }
+ }()
+
+ if update == nil {
+ dalSet := make([]dal.DalSet, 0, len(columns))
+ for i, v := range columns {
+ dalSet = append(dalSet, dal.DalSet{
+ ColumnName: v,
+ Value: dal.ClauseColumn{Name:
tmpColumnsNames[i]},
+ })
+ }
+ err = db.UpdateColumns(
+ new(D),
+ dalSet,
+ )
+ } else {
+ params := make([]interface{}, 0, len(tmpColumnsNames))
+ for _, v := range tmpColumnsNames {
+ params = append(params, dal.ClauseColumn{Name: v})
+ }
+ err = update(params)
+ }
+ if err != nil {
+ return err
+ }
+
+ err = db.DropColumns(tableName, tmpColumnsNames...)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// TransformColumns change the type of specified columns for the table and
transform data one by one
+func TransformColumns[S any, D any](
+ basicRes core.BasicRes,
+ script core.MigrationScript,
+ tableName string,
+ columns []string,
+ transform func(src *S) (*D, errors.Error),
+) (err errors.Error) {
+ db := basicRes.GetDal()
+ return ChangeColumnsType[D](
+ basicRes,
+ script,
+ tableName,
+ columns,
+ func(tmpColumnParams []interface{}) errors.Error {
+ // create selectStr for transform tmpColumnsNames
+ params := make([]interface{}, 0, len(columns)*2)
+ selectStr := " * "
+ for i, v := range columns {
+ selectStr += ",? as ?"
+ params = append(params, tmpColumnParams[i])
+ params = append(params, dal.ClauseColumn{Name:
v})
+ }
+
+ cursor, err := db.Cursor(
+ dal.Select(selectStr, params...),
+ dal.From(dal.ClauseTable{Name: tableName}),
+ )
+ if err != nil {
+ return errors.Default.Wrap(err,
fmt.Sprintf("failed to load data from src table [%s]", tableName))
+ }
+
+ defer cursor.Close()
+ batch, err := helper.NewBatchSave(basicRes,
reflect.TypeOf(new(D)), 200, tableName)
+ if err != nil {
+ return errors.Default.Wrap(err,
fmt.Sprintf("failed to instantiate BatchSave for table [%s]", tableName))
+ }
+ defer batch.Close()
+ src := new(S)
+ for cursor.Next() {
+ err = db.Fetch(cursor, src)
+ if err != nil {
+ return errors.Default.Wrap(err,
fmt.Sprintf("fail to load record from table [%s]", tableName))
+ }
+
+ dst, err := transform(src)
+
+ if err != nil {
+ return errors.Default.Wrap(err,
fmt.Sprintf("failed to update row %v", src))
+ }
+ err = batch.Add(dst)
+ if err != nil {
+ return errors.Default.Wrap(err,
fmt.Sprintf("push to BatchSave failed %v", dst))
+ }
+ }
+ return nil
+ },
+ )
+}
+
// TransformTable can be used when we need to change the table structure and
reprocess all the data in the table.
func TransformTable[S any, D any](
basicRes core.BasicRes,
diff --git a/impl/dalgorm/dalgorm.go b/impl/dalgorm/dalgorm.go
index 9801332e..9578afe1 100644
--- a/impl/dalgorm/dalgorm.go
+++ b/impl/dalgorm/dalgorm.go
@@ -35,15 +35,41 @@ type Dalgorm struct {
db *gorm.DB
}
+func transformParams(params []interface{}) []interface{} {
+ tp := make([]interface{}, 0, len(params))
+
+ for _, v := range params {
+ switch p := v.(type) {
+ case dal.ClauseColumn:
+ tp = append(tp, clause.Column{
+ Table: p.Table,
+ Name: p.Name,
+ Alias: p.Alias,
+ Raw: p.Raw,
+ })
+ case dal.ClauseTable:
+ tp = append(tp, clause.Table{
+ Name: p.Name,
+ Alias: p.Alias,
+ Raw: p.Raw,
+ })
+ default:
+ tp = append(tp, p)
+ }
+ }
+
+ return tp
+}
+
func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
for _, c := range clauses {
t := c.Type
d := c.Data
switch t {
case dal.JoinClause:
- tx = tx.Joins(d.(dal.DalClause).Expr,
d.(dal.DalClause).Params...)
+ tx = tx.Joins(d.(dal.DalClause).Expr,
transformParams(d.(dal.DalClause).Params)...)
case dal.WhereClause:
- tx = tx.Where(d.(dal.DalClause).Expr,
d.(dal.DalClause).Params...)
+ tx = tx.Where(d.(dal.DalClause).Expr,
transformParams(d.(dal.DalClause).Params)...)
case dal.OrderbyClause:
tx = tx.Order(d.(string))
case dal.LimitClause:
@@ -51,17 +77,26 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
case dal.OffsetClause:
tx = tx.Offset(d.(int))
case dal.FromClause:
- if str, ok := d.(string); ok {
- tx = tx.Table(str)
- } else {
+ switch dd := d.(type) {
+ case string:
+ tx = tx.Table(dd)
+ case dal.DalClause:
+ tx = tx.Table(dd.Expr,
transformParams(dd.Params)...)
+ case dal.ClauseTable:
+ tx = tx.Table(" ? ", clause.Table{
+ Name: dd.Name,
+ Alias: dd.Alias,
+ Raw: dd.Raw,
+ })
+ default:
tx = tx.Model(d)
}
case dal.SelectClause:
- tx = tx.Select(d.(string))
+ tx = tx.Select(d.(dal.DalClause).Expr,
transformParams(d.(dal.DalClause).Params)...)
case dal.GroupbyClause:
tx = tx.Group(d.(string))
case dal.HavingClause:
- tx = tx.Having(d.(dal.DalClause).Expr,
d.(dal.DalClause).Params...)
+ tx = tx.Having(d.(dal.DalClause).Expr,
transformParams(d.(dal.DalClause).Params)...)
}
}
return tx
@@ -69,14 +104,9 @@ func buildTx(tx *gorm.DB, clauses []dal.Clause) *gorm.DB {
var _ dal.Dal = (*Dalgorm)(nil)
-// RawCursor executes raw sql query and returns a database cursor
-func (d *Dalgorm) RawCursor(query string, params ...interface{}) (*sql.Rows,
errors.Error) {
- return errors.Convert01(d.db.Raw(query, params...).Rows())
-}
-
// Exec executes raw sql query
func (d *Dalgorm) Exec(query string, params ...interface{}) errors.Error {
- return errors.Convert(d.db.Exec(query, params...).Error)
+ return errors.Convert(d.db.Exec(query,
transformParams(params)...).Error)
}
// AutoMigrate runs auto migration for given models
@@ -154,13 +184,27 @@ func (d *Dalgorm) Delete(entity interface{}, clauses
...dal.Clause) errors.Error
// UpdateColumn allows you to update mulitple records
func (d *Dalgorm) UpdateColumn(entity interface{}, columnName string, value
interface{}, clauses ...dal.Clause) errors.Error {
if expr, ok := value.(dal.DalClause); ok {
- value = gorm.Expr(expr.Expr, expr.Params...)
+ value = gorm.Expr(expr.Expr, transformParams(expr.Params)...)
}
return errors.Convert(buildTx(d.db,
clauses).Model(entity).Update(columnName, value).Error)
}
// UpdateColumns allows you to update multiple columns of mulitple records
-func (d *Dalgorm) UpdateColumns(entity interface{}, clauses ...dal.Clause)
errors.Error {
+func (d *Dalgorm) UpdateColumns(entity interface{}, set []dal.DalSet, clauses
...dal.Clause) errors.Error {
+ updatesSet := make(map[string]interface{})
+
+ for _, s := range set {
+ if expr, ok := s.Value.(dal.DalClause); ok {
+ s.Value = gorm.Expr(expr.Expr,
transformParams(expr.Params)...)
+ }
+ updatesSet[s.ColumnName] = s.Value
+ }
+
+ return errors.Convert(buildTx(d.db,
clauses).Model(entity).Updates(updatesSet).Error)
+}
+
+// UpdateAllColumn updated all Columns of entity
+func (d *Dalgorm) UpdateAllColumn(entity interface{}, clauses ...dal.Clause)
errors.Error {
return errors.Convert(buildTx(d.db,
clauses).UpdateColumns(entity).Error)
}
diff --git a/models/migrationscripts/20220903_encrypt_blueprint.go
b/models/migrationscripts/20220903_encrypt_blueprint.go
index c5171bc1..d1ca7af7 100644
--- a/models/migrationscripts/20220903_encrypt_blueprint.go
+++ b/models/migrationscripts/20220903_encrypt_blueprint.go
@@ -30,7 +30,7 @@ var _ core.MigrationScript = (*encryptBlueprint)(nil)
type encryptBlueprint struct{}
-type Blueprint20220903Before struct {
+type blueprint20220903Before struct {
Name string `json:"name" validate:"required"`
Mode string `json:"mode" gorm:"varchar(20)"
validate:"required,oneof=NORMAL ADVANCED"`
Plan json.RawMessage `json:"plan"`
@@ -41,7 +41,7 @@ type Blueprint20220903Before struct {
archived.Model `swaggerignore:"true"`
}
-type Blueprint20220903After struct {
+type blueprint20220903After struct {
/* unchanged part */
Name string `json:"name" validate:"required"`
Mode string `json:"mode" gorm:"varchar(20)"
validate:"required,oneof=NORMAL ADVANCED"`
@@ -64,7 +64,7 @@ func (script *encryptBlueprint) Up(basicRes core.BasicRes)
errors.Error {
basicRes,
script,
"_devlake_blueprints",
- func(s *Blueprint20220903Before) (*Blueprint20220903After,
errors.Error) {
+ func(s *blueprint20220903Before) (*blueprint20220903After,
errors.Error) {
encryptedPlan, err := core.Encrypt(encKey,
string(s.Plan))
if err != nil {
return nil, err
@@ -74,7 +74,7 @@ func (script *encryptBlueprint) Up(basicRes core.BasicRes)
errors.Error {
return nil, err
}
- dst := &Blueprint20220903After{
+ dst := &blueprint20220903After{
Name: s.Name,
Mode: s.Mode,
Enable: s.Enable,
diff --git a/models/migrationscripts/20220904_encrypt_pipeline.go
b/models/migrationscripts/20220904_encrypt_pipeline.go
index 9f64dd86..34391975 100644
--- a/models/migrationscripts/20220904_encrypt_pipeline.go
+++ b/models/migrationscripts/20220904_encrypt_pipeline.go
@@ -32,7 +32,7 @@ var _ core.MigrationScript = (*encryptPipeline)(nil)
type encryptPipeline struct{}
-type Pipeline20220904Before struct {
+type pipeline20220904Before struct {
archived.Model
Name string `json:"name" gorm:"index"`
BlueprintId uint64 `json:"blueprintId"`
@@ -47,7 +47,7 @@ type Pipeline20220904Before struct {
Stage int `json:"stage"`
}
-type Pipeline0904After struct {
+type pipeline0904After struct {
common.Model
Name string `json:"name" gorm:"index"`
BlueprintId uint64 `json:"blueprintId"`
@@ -72,13 +72,13 @@ func (script *encryptPipeline) Up(basicRes core.BasicRes)
errors.Error {
basicRes,
script,
"_devlake_pipelines",
- func(s *Pipeline20220904Before) (*Pipeline0904After,
errors.Error) {
+ func(s *pipeline20220904Before) (*pipeline0904After,
errors.Error) {
encryptedPlan, err := core.Encrypt(encKey,
string(s.Plan))
if err != nil {
return nil, err
}
- dst := &Pipeline0904After{
+ dst := &pipeline0904After{
Name: s.Name,
BlueprintId: s.BlueprintId,
FinishedTasks: s.FinishedTasks,
diff --git a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
index 75ea909d..9b9dad98 100644
--- a/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
+++ b/models/migrationscripts/20220905_modfiy_cicd_pipeline.go
@@ -28,13 +28,13 @@ var _ core.MigrationScript = (*modifyCicdPipeline)(nil)
type modifyCicdPipeline struct{}
-type CICDPipelineRelationship20220905 struct {
+type cicdPipelineRelationship20220905 struct {
ParentPipelineId string `gorm:"primaryKey;type:varchar(255)"`
ChildPipelineId string `gorm:"primaryKey;type:varchar(255)"`
archived.NoPKModel
}
-func (CICDPipelineRelationship20220905) TableName() string {
+func (cicdPipelineRelationship20220905) TableName() string {
return "cicd_pipeline_relationships"
}
@@ -48,7 +48,7 @@ func (*modifyCicdPipeline) Up(basicRes core.BasicRes)
errors.Error {
if err != nil {
return err
}
- err = db.AutoMigrate(&CICDPipelineRelationship20220905{})
+ err = db.AutoMigrate(&cicdPipelineRelationship20220905{})
if err != nil {
return errors.Convert(err)
}
diff --git a/models/migrationscripts/20220908_modfiy_cicd_tasks.go
b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
index fa669536..2f0d3472 100644
--- a/models/migrationscripts/20220908_modfiy_cicd_tasks.go
+++ b/models/migrationscripts/20220908_modfiy_cicd_tasks.go
@@ -26,16 +26,16 @@ var _ core.MigrationScript = (*modifyCICDTasks)(nil)
type modifyCICDTasks struct{}
-type CICDTask0905 struct {
+type cicdTask0905 struct {
Environment string `gorm:"type:varchar(255)"`
}
-func (CICDTask0905) TableName() string {
+func (cicdTask0905) TableName() string {
return "cicd_tasks"
}
func (*modifyCICDTasks) Up(basicRes core.BasicRes) errors.Error {
- return basicRes.GetDal().AutoMigrate(&CICDTask0905{})
+ return basicRes.GetDal().AutoMigrate(&cicdTask0905{})
}
func (*modifyCICDTasks) Version() uint64 {
diff --git a/models/migrationscripts/20220913_add_origin_value_for_pr.go
b/models/migrationscripts/20220913_add_origin_value_for_pr.go
index b26c61d8..3ce3d060 100644
--- a/models/migrationscripts/20220913_add_origin_value_for_pr.go
+++ b/models/migrationscripts/20220913_add_origin_value_for_pr.go
@@ -26,19 +26,19 @@ var _ core.MigrationScript =
(*addOriginChangeValueForPr)(nil)
type addOriginChangeValueForPr struct{}
-type PullRequest0913 struct {
+type pullRequest0913 struct {
OrigCodingTimespan int64
OrigReviewLag int64
OrigReviewTimespan int64
OrigDeployTimespan int64
}
-func (PullRequest0913) TableName() string {
+func (pullRequest0913) TableName() string {
return "pull_requests"
}
func (*addOriginChangeValueForPr) Up(basicRes core.BasicRes) errors.Error {
- return basicRes.GetDal().AutoMigrate(&PullRequest0913{})
+ return basicRes.GetDal().AutoMigrate(&pullRequest0913{})
}
func (*addOriginChangeValueForPr) Version() uint64 {
diff --git a/models/migrationscripts/20220915_rename_pipeline_commits.go
b/models/migrationscripts/20220915_rename_pipeline_commits.go
index 69da85b7..6c46f670 100644
--- a/models/migrationscripts/20220915_rename_pipeline_commits.go
+++ b/models/migrationscripts/20220915_rename_pipeline_commits.go
@@ -27,18 +27,7 @@ var _ core.MigrationScript = (*renamePipelineCommits)(nil)
type renamePipelineCommits struct{}
-type CiCDPipelineRepo20220915Before struct {
- archived.DomainEntity
- CommitSha string `gorm:"primaryKey;type:varchar(255)"`
- Branch string `gorm:"type:varchar(255)"`
- Repo string `gorm:"type:varchar(255)"`
-}
-
-func (CiCDPipelineRepo20220915Before) TableName() string {
- return "cicd_pipeline_repos"
-}
-
-type CiCDPipelineRepo20220915After struct {
+type cicdPipelineRepo20220915After struct {
archived.NoPKModel
PipelineId string `gorm:"primaryKey;type:varchar(255)"`
CommitSha string `gorm:"primaryKey;type:varchar(255)"`
@@ -47,7 +36,7 @@ type CiCDPipelineRepo20220915After struct {
RepoUrl string
}
-func (CiCDPipelineRepo20220915After) TableName() string {
+func (cicdPipelineRepo20220915After) TableName() string {
return "cicd_pipeline_commits"
}
@@ -65,7 +54,7 @@ func (*renamePipelineCommits) Up(basicRes core.BasicRes)
errors.Error {
if err != nil {
return err
}
- err = db.AutoMigrate(CiCDPipelineRepo20220915After{})
+ err = db.AutoMigrate(cicdPipelineRepo20220915After{})
if err != nil {
return err
}
diff --git
a/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
b/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
index 553b4db7..bce665b2 100644
--- a/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
+++ b/models/migrationscripts/20220929_change_leadtimeminutes_to_int64.go
@@ -19,6 +19,7 @@ package migrationscripts
import (
"github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
)
@@ -27,52 +28,33 @@ var _ core.MigrationScript =
(*changeLeadTimeMinutesToInt64)(nil)
type changeLeadTimeMinutesToInt64 struct{}
-type Issues20220929 struct {
+type issues20220929 struct {
LeadTimeMinutes int64
}
-func (Issues20220929) TableName() string {
+func (issues20220929) TableName() string {
return "issues"
}
-func (*changeLeadTimeMinutesToInt64) Up(basicRes core.BasicRes) errors.Error {
+func (script *changeLeadTimeMinutesToInt64) Up(basicRes core.BasicRes)
errors.Error {
// Yes, issues.lead_time_minutes might be negative, we ought to change
the type
// for the column from `uint` to `int64`
// related issue:
https://github.com/apache/incubator-devlake/issues/3224
db := basicRes.GetDal()
- bakColumnName := "lead_time_minutes_20220929"
- err := db.RenameColumn("issues", "lead_time_minutes", bakColumnName)
- defer func() {
- if err != nil {
- _ = db.RenameColumn("issues", bakColumnName,
"lead_time_minutes")
- }
- }()
- if err != nil {
- return err
- }
- err = db.AutoMigrate(&Issues20220929{})
- if err != nil {
- return err
- }
- defer func() {
- if err != nil {
- _ = db.DropColumns("issues", "lead_time_minutes")
- }
- }()
- err = db.UpdateColumn(
- &Issues20220929{},
- "lead_time_minutes",
- dal.DalClause{Expr: bakColumnName},
- dal.Where("lead_time_minutes != 0"),
+ return migrationhelper.ChangeColumnsType[issues20220929](
+ basicRes,
+ script,
+ issues20220929{}.TableName(),
+ []string{"lead_time_minutes"},
+ func(tmpColumnParams []interface{}) errors.Error {
+ return db.UpdateColumn(
+ &issues20220929{},
+ "lead_time_minutes",
+ dal.DalClause{Expr: " ? ", Params:
tmpColumnParams},
+ dal.Where("? != 0", tmpColumnParams...),
+ )
+ },
)
- if err != nil {
- return err
- }
- err = db.DropColumns("issues", bakColumnName)
- if err != nil {
- return err
- }
- return nil
}
func (*changeLeadTimeMinutesToInt64) Version() uint64 {
diff --git a/plugins/azure/api/blueprint.go b/plugins/azure/api/blueprint.go
index 6598aae8..29d6612b 100644
--- a/plugins/azure/api/blueprint.go
+++ b/plugins/azure/api/blueprint.go
@@ -19,6 +19,7 @@ package api
import (
"encoding/json"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/helper"
@@ -31,8 +32,8 @@ func MakePipelinePlan(subtaskMetas []core.SubTaskMeta,
connectionId uint64, scop
for i, scopeElem := range scope {
// handle taskOptions and transformationRules, by dumping them
to taskOptions
taskOptions := make(map[string]interface{})
- err = errors.Default.Wrap(json.Unmarshal(scopeElem.Options,
&taskOptions), "unable to deserialize pipeline task options")
- if err != nil {
+ err1 := json.Unmarshal(scopeElem.Options, &taskOptions)
+ if err1 != nil {
return nil, errors.Default.Wrap(err, "unable to
deserialize pipeline task options")
}
taskOptions["connectionId"] = connectionId
diff --git a/plugins/core/dal/dal.go b/plugins/core/dal/dal.go
index a0121256..4d51a6b5 100644
--- a/plugins/core/dal/dal.go
+++ b/plugins/core/dal/dal.go
@@ -34,6 +34,21 @@ type Clause struct {
Data interface{}
}
+// ClauseColumn quote with name
+type ClauseColumn struct {
+ Table string
+ Name string
+ Alias string
+ Raw bool
+}
+
+// ClauseTable quote with name
+type ClauseTable struct {
+ Name string
+ Alias string
+ Raw bool
+}
+
// ColumnMeta column type interface
type ColumnMeta interface {
Name() string
@@ -60,8 +75,6 @@ type Dal interface {
DropColumns(table string, columnName ...string) errors.Error
// Exec executes raw sql query
Exec(query string, params ...interface{}) errors.Error
- // RawCursor executes raw sql query and returns a database cursor
- RawCursor(query string, params ...interface{}) (*sql.Rows, errors.Error)
// Cursor returns a database cursor, cursor is especially useful when
handling big amount of rows of data
Cursor(clauses ...Clause) (*sql.Rows, errors.Error)
// Fetch loads row data from `cursor` into `dst`
@@ -81,7 +94,9 @@ type Dal interface {
// UpdateColumn allows you to update mulitple records
UpdateColumn(entity interface{}, columnName string, value interface{},
clauses ...Clause) errors.Error
// UpdateColumn allows you to update multiple columns of mulitple
records
- UpdateColumns(entity interface{}, clauses ...Clause) errors.Error
+ UpdateColumns(entity interface{}, set []DalSet, clauses ...Clause)
errors.Error
+ // UpdateAllColumn updated all Columns of entity
+ UpdateAllColumn(entity interface{}, clauses ...Clause) errors.Error
// CreateOrUpdate tries to create the record, or fallback to update all
if failed
CreateOrUpdate(entity interface{}, clauses ...Clause) errors.Error
// CreateIfNotExist tries to create the record if not exist
@@ -143,6 +158,11 @@ type DalClause struct {
Params []interface{}
}
+type DalSet struct {
+ ColumnName string
+ Value interface{}
+}
+
const JoinClause string = "Join"
// Join creates a new JoinClause
@@ -174,15 +194,19 @@ func Offset(offset int) Clause {
const FromClause string = "From"
// From creates a new TableClause
-func From(table interface{}) Clause {
- return Clause{Type: FromClause, Data: table}
+func From(table interface{}, params ...interface{}) Clause {
+ if len(params) == 0 {
+ return Clause{Type: FromClause, Data: table}
+ } else {
+ return Clause{Type: FromClause, Data: DalClause{table.(string),
params}}
+ }
}
const SelectClause string = "Select"
// Select creates a new TableClause
-func Select(fields string) Clause {
- return Clause{Type: SelectClause, Data: fields}
+func Select(clause string, params ...interface{}) Clause {
+ return Clause{Type: SelectClause, Data: DalClause{clause, params}}
}
const OrderbyClause string = "OrderBy"
diff --git a/plugins/gitlab/impl/impl.go b/plugins/gitlab/impl/impl.go
index 9127dffb..af16454b 100644
--- a/plugins/gitlab/impl/impl.go
+++ b/plugins/gitlab/impl/impl.go
@@ -19,9 +19,9 @@ package impl
import (
"fmt"
+
"github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/migration"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/gitlab/api"
"github.com/apache/incubator-devlake/plugins/gitlab/models"
@@ -37,7 +37,7 @@ var _ core.PluginInit = (*Gitlab)(nil)
var _ core.PluginModel = (*Gitlab)(nil)
var _ core.PluginTask = (*Gitlab)(nil)
var _ core.PluginApi = (*Gitlab)(nil)
-var _ core.Migratable = (*Gitlab)(nil)
+var _ core.PluginMigration = (*Gitlab)(nil)
var _ core.PluginBlueprintV100 = (*Gitlab)(nil)
var _ core.CloseablePluginTask = (*Gitlab)(nil)
@@ -144,7 +144,7 @@ func (plugin Gitlab) RootPkgPath() string {
return "github.com/apache/incubator-devlake/plugins/gitlab"
}
-func (plugin Gitlab) MigrationScripts() []migration.Script {
+func (plugin Gitlab) MigrationScripts() []core.MigrationScript {
return migrationscripts.All()
}
diff --git a/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
b/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
index 7e5eec92..ad22091b 100644
--- a/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
+++ b/plugins/gitlab/models/migrationscripts/20220714_add_init_tables.go
@@ -18,19 +18,19 @@ limitations under the License.
package migrationscripts
import (
- "context"
- "github.com/apache/incubator-devlake/config"
+ "strconv"
+
"github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/gitlab/models/migrationscripts/archived"
- "gorm.io/gorm"
- "gorm.io/gorm/clause"
)
type addInitTables struct{}
-func (*addInitTables) Up(ctx context.Context, db *gorm.DB) errors.Error {
- err := db.Migrator().DropTable(
+func (*addInitTables) Up(baseRes core.BasicRes) errors.Error {
+ db := baseRes.GetDal()
+ err := db.DropTables(
&archived.GitlabProject{},
&archived.GitlabMergeRequest{},
&archived.GitlabCommit{},
@@ -61,10 +61,11 @@ func (*addInitTables) Up(ctx context.Context, db *gorm.DB)
errors.Error {
)
if err != nil {
- return errors.Convert(err)
+ return err
}
- err = db.Migrator().AutoMigrate(
+ err = migrationhelper.AutoMigrateTables(
+ baseRes,
&archived.GitlabProject{},
&archived.GitlabMergeRequest{},
&archived.GitlabCommit{},
@@ -83,13 +84,12 @@ func (*addInitTables) Up(ctx context.Context, db *gorm.DB)
errors.Error {
)
if err != nil {
- return errors.Convert(err)
+ return err
}
- v := config.GetConfig()
- encKey := v.GetString("ENCODE_KEY")
- endPoint := v.GetString("GITLAB_ENDPOINT")
- gitlabAuth := v.GetString("GITLAB_AUTH")
+ encKey := baseRes.GetConfig("ENCODE_KEY")
+ endPoint := baseRes.GetConfig("GITLAB_ENDPOINT")
+ gitlabAuth := baseRes.GetConfig("GITLAB_AUTH")
if encKey == "" || endPoint == "" || gitlabAuth == "" {
return nil
@@ -100,15 +100,18 @@ func (*addInitTables) Up(ctx context.Context, db
*gorm.DB) errors.Error {
conn.Endpoint = endPoint
conn.Token, err = core.Encrypt(encKey, gitlabAuth)
if err != nil {
- return errors.Convert(err)
+ return err
}
- conn.Proxy = v.GetString("GITLAB_PROXY")
- conn.RateLimitPerHour = v.GetInt("GITLAB_API_REQUESTS_PER_HOUR")
-
- err = db.Clauses(clause.OnConflict{DoNothing: true}).Create(conn).Error
+ conn.Proxy = baseRes.GetConfig("GITLAB_PROXY")
+ var err1 error
+ conn.RateLimitPerHour, err1 =
strconv.Atoi(baseRes.GetConfig("GITLAB_API_REQUESTS_PER_HOUR"))
+ if err1 != nil {
+ conn.RateLimitPerHour = 1000
+ }
+ err = db.CreateIfNotExist(conn)
if err != nil {
- return errors.Convert(err)
+ return err
}
return nil
diff --git a/plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go
b/plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
similarity index 76%
rename from plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go
rename to plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
index 2d7ae899..4a178acd 100644
--- a/plugins/gitlab/models/migrationscripts/20220729_modify_gilab_ci.go
+++ b/plugins/gitlab/models/migrationscripts/20220729_add_gitlab_ci.go
@@ -18,17 +18,20 @@ limitations under the License.
package migrationscripts
import (
- "context"
- "github.com/apache/incubator-devlake/errors"
"time"
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/plugins/core"
+
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
- "gorm.io/gorm"
)
-type modifyGitlabCI struct{}
+// Add gitlab job file for GitlabCI
+// Add gitlab_updated_at on gitlab pipeline for GitlabCI
+type addGitlabCI struct{}
-type GitlabPipeline20220729 struct {
+type gitlabPipeline20220729 struct {
ConnectionId uint64 `gorm:"primaryKey"`
GitlabId int `gorm:"primaryKey"`
@@ -48,11 +51,11 @@ type GitlabPipeline20220729 struct {
archived.NoPKModel
}
-func (GitlabPipeline20220729) TableName() string {
+func (gitlabPipeline20220729) TableName() string {
return "_tool_gitlab_pipelines"
}
-type GitlabJob20220729 struct {
+type gitlabJob20220729 struct {
ConnectionId uint64 `gorm:"primaryKey"`
GitlabId int `gorm:"primaryKey"`
@@ -73,28 +76,27 @@ type GitlabJob20220729 struct {
archived.NoPKModel
}
-func (GitlabJob20220729) TableName() string {
+func (gitlabJob20220729) TableName() string {
return "_tool_gitlab_jobs"
}
-func (*modifyGitlabCI) Up(ctx context.Context, db *gorm.DB) errors.Error {
- err := db.Migrator().AddColumn(&GitlabPipeline20220729{},
"gitlab_updated_at")
- if err != nil {
- return errors.Convert(err)
- }
-
- err = db.Migrator().AutoMigrate(&GitlabJob20220729{})
+func (*addGitlabCI) Up(baseRes core.BasicRes) errors.Error {
+ err := migrationhelper.AutoMigrateTables(
+ baseRes,
+ &gitlabJob20220729{},
+ &gitlabPipeline20220729{},
+ )
if err != nil {
- return errors.Convert(err)
+ return err
}
return nil
}
-func (*modifyGitlabCI) Version() uint64 {
+func (*addGitlabCI) Version() uint64 {
return 20220729231236
}
-func (*modifyGitlabCI) Name() string {
+func (*addGitlabCI) Name() string {
return "pipeline and job"
}
diff --git a/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
b/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
index 678543f8..17228d52 100644
--- a/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
+++ b/plugins/gitlab/models/migrationscripts/20220804_add_pipeline_id.go
@@ -18,17 +18,17 @@ limitations under the License.
package migrationscripts
import (
- "context"
- "github.com/apache/incubator-devlake/errors"
"time"
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/core"
+
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
- "gorm.io/gorm"
)
type addPipelineID struct{}
-type GitlabJob20220804 struct {
+type gitlabJob20220804 struct {
ConnectionId uint64 `gorm:"primaryKey"`
GitlabId int `gorm:"primaryKey"`
@@ -50,14 +50,15 @@ type GitlabJob20220804 struct {
archived.NoPKModel
}
-func (GitlabJob20220804) TableName() string {
+func (gitlabJob20220804) TableName() string {
return "_tool_gitlab_jobs"
}
-func (*addPipelineID) Up(ctx context.Context, db *gorm.DB) errors.Error {
- err := db.Migrator().AddColumn(&GitlabJob20220804{}, "pipeline_id")
+func (*addPipelineID) Up(baseRes core.BasicRes) errors.Error {
+ db := baseRes.GetDal()
+ err := db.AutoMigrate(&gitlabJob20220804{})
if err != nil {
- return errors.Convert(err)
+ return err
}
return nil
}
diff --git
a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
index fb62cc82..19183bb7 100644
--- a/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
+++ b/plugins/gitlab/models/migrationscripts/20220906_fix_duration_to_float8.go
@@ -18,63 +18,45 @@ limitations under the License.
package migrationscripts
import (
- "context"
"github.com/apache/incubator-devlake/errors"
- "github.com/apache/incubator-devlake/plugins/gitlab/api"
- "github.com/apache/incubator-devlake/plugins/helper"
- "gorm.io/gorm"
- "reflect"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/plugins/core"
)
type fixDurationToFloat8 struct{}
-type GitlabJob20220906 struct {
+type gitlabJob20220906_old struct {
ConnectionId uint64 `gorm:"primaryKey"`
GitlabId int `gorm:"primaryKey"`
- Duration float64 `gorm:"type:text"`
- Duration2 float64 `gorm:"type:float8"`
+ Duration float64 `gorm:"type:text"`
}
+type gitlabJob20220906 struct {
+ ConnectionId uint64 `gorm:"primaryKey"`
+ GitlabId int `gorm:"primaryKey"`
-func (GitlabJob20220906) TableName() string {
- return "_tool_gitlab_jobs"
+ Duration float64 `gorm:"type:float8"`
}
-func (*fixDurationToFloat8) Up(ctx context.Context, db *gorm.DB) errors.Error {
- err := db.Migrator().AddColumn(&GitlabJob20220906{}, `duration2`)
- if err != nil {
- return errors.Convert(err)
- }
- cursor, err :=
db.Model(&GitlabJob20220906{}).Select([]string{"connection_id", "gitlab_id",
"duration"}).Rows()
- if err != nil {
- return errors.Convert(err)
- }
- batch, err := helper.NewBatchSave(api.BasicRes,
reflect.TypeOf(&GitlabJob20220906{}), 500)
- if err != nil {
- return errors.Default.Wrap(err, "error getting batch from
table")
- }
- defer batch.Close()
- for cursor.Next() {
- job := GitlabJob20220906{}
- err = db.ScanRows(cursor, &job)
- if err != nil {
- return errors.Convert(err)
- }
- job.Duration2 = job.Duration
- err = batch.Add(&job)
- if err != nil {
- return errors.Convert(err)
- }
- }
+func (*fixDurationToFloat8) Up(baseRes core.BasicRes) errors.Error {
+ err := migrationhelper.TransformColumns(
+ baseRes,
+ &fixDurationToFloat8{},
+ "_tool_gitlab_jobs",
+ []string{"duration"},
+ func(src *gitlabJob20220906_old) (*gitlabJob20220906,
errors.Error) {
+ return &gitlabJob20220906{
+ ConnectionId: src.ConnectionId,
+ GitlabId: src.GitlabId,
+ Duration: src.Duration,
+ }, nil
+ },
+ )
- err = db.Migrator().DropColumn(&GitlabJob20220906{}, `duration`)
if err != nil {
- return errors.Convert(err)
- }
- err = db.Migrator().RenameColumn(&GitlabJob20220906{}, `duration2`,
`duration`)
- if err != nil {
- return errors.Convert(err)
+ return err
}
+
return nil
}
diff --git
a/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
b/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
index 456ea859..84bf5c44 100644
---
a/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
+++
b/plugins/gitlab/models/migrationscripts/20220907_add_pipeline_projects_tables.go
@@ -18,16 +18,16 @@ limitations under the License.
package migrationscripts
import (
- "context"
"github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/helpers/migrationhelper"
+ "github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/models/migrationscripts/archived"
- "gorm.io/gorm"
)
type addPipelineProjects struct{}
-type GitlabPipelineProjects20220907 struct {
+type gitlabPipelineProjects20220907 struct {
ConnectionId uint64 `gorm:"primaryKey"`
PipelineId int `gorm:"primaryKey"`
ProjectId int `gorm:"primaryKey"`
@@ -36,14 +36,14 @@ type GitlabPipelineProjects20220907 struct {
archived.NoPKModel
}
-func (GitlabPipelineProjects20220907) TableName() string {
+func (gitlabPipelineProjects20220907) TableName() string {
return "_tool_gitlab_pipeline_projects"
}
-func (*addPipelineProjects) Up(ctx context.Context, db *gorm.DB) errors.Error {
- err := db.Migrator().CreateTable(&GitlabPipelineProjects20220907{})
+func (*addPipelineProjects) Up(baseRes core.BasicRes) errors.Error {
+ err := migrationhelper.AutoMigrateTables(baseRes,
&gitlabPipelineProjects20220907{})
if err != nil {
- return errors.Convert(err)
+ return err
}
return nil
}
diff --git a/plugins/gitlab/models/migrationscripts/register.go
b/plugins/gitlab/models/migrationscripts/register.go
index aba5abaf..a91a1fcd 100644
--- a/plugins/gitlab/models/migrationscripts/register.go
+++ b/plugins/gitlab/models/migrationscripts/register.go
@@ -18,14 +18,14 @@ limitations under the License.
package migrationscripts
import (
- "github.com/apache/incubator-devlake/migration"
+ "github.com/apache/incubator-devlake/plugins/core"
)
// All return all the migration scripts
-func All() []migration.Script {
- return []migration.Script{
+func All() []core.MigrationScript {
+ return []core.MigrationScript{
new(addInitTables),
- new(modifyGitlabCI),
+ new(addGitlabCI),
new(addPipelineID),
new(addPipelineProjects),
new(fixDurationToFloat8),
diff --git a/plugins/jira/tasks/epic_collector.go
b/plugins/jira/tasks/epic_collector.go
index 3b09cee3..78e6c31d 100644
--- a/plugins/jira/tasks/epic_collector.go
+++ b/plugins/jira/tasks/epic_collector.go
@@ -19,17 +19,19 @@ package tasks
import (
"fmt"
+ "reflect"
+ "strings"
+
"github.com/apache/incubator-devlake/errors"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
- "reflect"
- "strings"
"encoding/json"
- "github.com/apache/incubator-devlake/plugins/helper"
"io"
"net/http"
"net/url"
+
+ "github.com/apache/incubator-devlake/plugins/helper"
)
const RAW_EPIC_TABLE = "jira_api_epics"
@@ -108,23 +110,24 @@ func CollectEpics(taskCtx core.SubTaskContext)
errors.Error {
}
func GetEpicKeysIterator(db dal.Dal, data *JiraTaskData, batchSize int)
(helper.Iterator, errors.Error) {
- cursor, err := db.RawCursor(`
- SELECT
- DISTINCT epic_key
- FROM
- _tool_jira_issues i
+ cursor, err := db.Cursor(
+ dal.Select("DISTINCT epic_key"),
+ dal.From("_tool_jira_issues i"),
+ dal.Join(`
LEFT JOIN _tool_jira_board_issues bi ON (
- i.connection_id = bi.connection_id
- AND
- i.issue_id = bi.issue_id
- )
- WHERE
- i.connection_id = ?
- AND
- bi.board_id = ?
- AND
- i.epic_key != ''
- `, data.Options.ConnectionId, data.Options.BoardId)
+ i.connection_id = bi.connection_id
+ AND
+ i.issue_id = bi.issue_id
+ )`),
+ dal.Where(`
+ i.connection_id = ?
+ AND
+ bi.board_id = ?
+ AND
+ i.epic_key != ''
+ `, data.Options.ConnectionId, data.Options.BoardId,
+ ),
+ )
if err != nil {
return nil, errors.Default.Wrap(err, "unable to query for
external epics")
}
diff --git a/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
b/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
index 3154798b..0daeb390 100644
--- a/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
+++ b/plugins/refdiff/tasks/refs_pr_cherry_pick_calculator.go
@@ -18,12 +18,13 @@ limitations under the License.
package tasks
import (
- "github.com/apache/incubator-devlake/errors"
"regexp"
"strconv"
"strings"
"time"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/models/domainlayer/code"
"github.com/apache/incubator-devlake/plugins/core"
"github.com/apache/incubator-devlake/plugins/core/dal"
@@ -114,25 +115,27 @@ func CalculatePrCherryPick(taskCtx core.SubTaskContext)
errors.Error {
taskCtx.IncProgress(1)
}
- cursor2, err := db.RawCursor(
- `
- SELECT pr2.pull_request_key AS
parent_pr_key,
- pr1.parent_pr_id AS
parent_pr_id,
- pr1.base_ref AS
cherrypick_base_branch,
- pr1.pull_request_key AS
cherrypick_pr_key,
- repos.NAME AS
repo_name,
- Concat(repos.url, '/pull/',
pr2.pull_request_key) AS parent_pr_url,
- pr2.created_date
- FROM pull_requests pr1
- LEFT JOIN pull_requests pr2
- ON pr1.parent_pr_id = pr2.id
- LEFT JOIN repos
- ON pr2.base_repo_id = repos.id
- WHERE pr1.parent_pr_id != ''
- ORDER BY pr1.parent_pr_id,
- pr2.created_date,
- pr1.base_ref ASC
- `)
+ cursor2, err := db.Cursor(
+ dal.Select(`
+ pr2.pull_request_key AS parent_pr_key,
+ pr1.parent_pr_id AS parent_pr_id,
+ pr1.base_ref AS
cherrypick_base_branch,
+ pr1.pull_request_key AS
cherrypick_pr_key,
+ repos.NAME AS repo_name,
+ Concat(repos.url, '/pull/', pr2.pull_request_key) AS
parent_pr_url,
+ pr2.created_date
+ `),
+ dal.From(`pull_requests pr1`),
+ dal.Join(`LEFT JOIN pull_requests pr2 ON pr1.parent_pr_id =
pr2.id`),
+ dal.Join(`LEFT JOIN repos ON pr2.base_repo_id = repos.id`),
+
+ dal.Where("pr1.parent_pr_id != ''"),
+ dal.Orderby(`
+ pr1.parent_pr_id,
+ pr2.created_date,
+ pr1.base_ref ASC
+ `),
+ )
if err != nil {
return errors.Convert(err)
}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index 19440210..f2ea396b 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -223,7 +223,12 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable, starrock
var rows *sql.Rows
var data []map[string]interface{}
// select data from db
- rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by
%s limit %d offset %d", table, orderBy, config.BatchSize, offset))
+ rows, err = db.Cursor(
+ dal.From(table),
+ dal.Orderby(orderBy),
+ dal.Limit(config.BatchSize),
+ dal.Offset(offset),
+ )
if err != nil {
return err
}
@@ -337,7 +342,10 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable, starrock
return err
}
// check data count
- rows, err := db.RawCursor(fmt.Sprintf("select count(*) from %s", table))
+ rows, err := db.Cursor(
+ dal.Select("count(*)"),
+ dal.From(table),
+ )
if err != nil {
return err
}