This is an automated email from the ASF dual-hosted git repository.

abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git


The following commit(s) were added to refs/heads/release-v0.14 by this push:
     new d0e43565a feat: cherrypick dbt and starrocks plugin improvement (#3523)
d0e43565a is described below

commit d0e43565ad8105e7cfbe1d9d9bb419db0444809f
Author: long2ice <[email protected]>
AuthorDate: Fri Oct 21 10:29:02 2022 +0800

    feat: cherrypick dbt and starrocks plugin improvement (#3523)
    
    * feat: dbt plugin improvement
    * feat: add order by for starrocks plugin and use transaction repeatable 
read when load data
---
 config-ui/src/data/pipeline-config-samples/dbt.js  |   4 +-
 .../src/data/pipeline-config-samples/starrocks.js  |   3 +-
 plugins/dbt/api/swagger.go                         |  42 ++++-----
 plugins/dbt/dbt.go                                 |  24 +++--
 plugins/dbt/tasks/convertor.go                     | 100 +++++++++++----------
 plugins/dbt/tasks/{task_data.go => git.go}         |  36 ++++++--
 plugins/dbt/tasks/task_data.go                     |  13 ++-
 plugins/starrocks/starrocks.go                     |   2 +
 plugins/starrocks/task_data.go                     |   7 +-
 plugins/starrocks/tasks.go                         |  74 ++++++++++-----
 10 files changed, 182 insertions(+), 123 deletions(-)

diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js 
b/config-ui/src/data/pipeline-config-samples/dbt.js
index 292a46d25..fac96b365 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -21,13 +21,15 @@ const dbtConfig = [
       plugin: 'dbt',
       options: {
         projectPath: '/var/www/html/my-project',
+        projectGitURL: '',
         projectName: 'myproject',
         projectTarget: 'dev',
         selectedModels: ['model_one', 'model_two'],
         projectVars: {
           demokey1: 'demovalue1',
           demokey2: 'demovalue2'
-        }
+        },
+        args: []
       }
     }
   ]
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js 
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index 150e0efce..a49321063 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -31,7 +31,8 @@ const starRocksConfig = [
         be_port: 8040,
         tables: ['_tool_.*'], // support regexp
         batch_size: 10000,
-        extra: '', // will append to create table sql
+        order_by: {},
+        extra: {}, // will append to create table sql
         domain_layer: '' // priority over tables
       }
     }
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index 841c2870a..a14c986cc 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -25,19 +25,23 @@ package api
 // @Router /blueprints/dbt/blueprint-plan [post]
 func _() {}
 
-type DbtBlueprintPlan [][]struct {
-       Plugin  string `json:"plugin"`
-       Options struct {
-               ProjectPath    string   `json:"projectPath"`
-               ProjectName    string   `json:"projectName"`
-               ProjectTarget  string   `json:"projectTarget"`
-               SelectedModels []string `json:"selectedModels"`
-               ProjectVars    struct {
-                       Demokey1 string `json:"demokey1"`
-                       Demokey2 string `json:"demokey2"`
-               } `json:"projectVars"`
-       } `json:"options"`
+type Options struct {
+       ProjectPath    string   `json:"projectPath"`
+       ProjectGitURL  string   `json:"projectGitURL"`
+       ProjectName    string   `json:"projectName"`
+       ProjectTarget  string   `json:"projectTarget"`
+       SelectedModels []string `json:"selectedModels"`
+       Args           []string `json:"args"`
+       ProjectVars    struct {
+               Demokey1 string `json:"demokey1"`
+               Demokey2 string `json:"demokey2"`
+       } `json:"projectVars"`
 }
+type Plan struct {
+       Plugin  string  `json:"plugin"`
+       Options Options `json:"options"`
+}
+type DbtBlueprintPlan [][]Plan
 
 // @Summary pipelines plan for dbt
 // @Description pipelines plan for dbt
@@ -47,16 +51,4 @@ type DbtBlueprintPlan [][]struct {
 // @Router /pipelines/dbt/pipeline-plan [post]
 func _() {}
 
-type DbtPipelinePlan [][]struct {
-       Plugin  string `json:"plugin"`
-       Options struct {
-               ProjectPath    string   `json:"projectPath"`
-               ProjectName    string   `json:"projectName"`
-               ProjectTarget  string   `json:"projectTarget"`
-               SelectedModels []string `json:"selectedModels"`
-               ProjectVars    struct {
-                       Demokey1 string `json:"demokey1"`
-                       Demokey2 string `json:"demokey2"`
-               } `json:"projectVars"`
-       } `json:"options"`
-}
+type DbtPipelinePlan [][]Plan
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index f336cb977..1a3c19cc0 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -27,8 +27,10 @@ import (
        "github.com/spf13/cobra"
 )
 
-var _ core.PluginMeta = (*Dbt)(nil)
-var _ core.PluginTask = (*Dbt)(nil)
+var (
+       _ core.PluginMeta = (*Dbt)(nil)
+       _ core.PluginTask = (*Dbt)(nil)
+)
 
 type Dbt struct{}
 
@@ -38,6 +40,7 @@ func (plugin Dbt) Description() string {
 
 func (plugin Dbt) SubTaskMetas() []core.SubTaskMeta {
        return []core.SubTaskMeta{
+               tasks.GitMeta,
                tasks.DbtConverterMeta,
        }
 }
@@ -55,15 +58,10 @@ func (plugin Dbt) PrepareTaskData(taskCtx core.TaskContext, 
options map[string]i
        if op.ProjectPath == "" {
                return nil, errors.Default.New("projectPath is required for dbt 
plugin")
        }
-       if op.ProjectName == "" {
-               return nil, errors.Default.New("projectName is required for dbt 
plugin")
-       }
+
        if op.ProjectTarget == "" {
                op.ProjectTarget = "dev"
        }
-       if op.SelectedModels == nil {
-               return nil, errors.Default.New("selectedModels is required for 
dbt plugin")
-       }
 
        return &tasks.DbtTaskData{
                Options: &op,
@@ -81,16 +79,12 @@ func main() {
        dbtCmd := &cobra.Command{Use: "dbt"}
        _ = dbtCmd.MarkFlagRequired("projectPath")
        projectPath := dbtCmd.Flags().StringP("projectPath", "p", 
"/Users/abeizn/demoapp", "user dbt project directory.")
-
-       _ = dbtCmd.MarkFlagRequired("projectName")
+       projectGitURL := dbtCmd.Flags().StringP("projectGitURL", "g", "", "user 
dbt project git url.")
        projectName := dbtCmd.Flags().StringP("projectName", "n", "demoapp", 
"user dbt project name.")
-
        projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", 
"this is the default target your dbt project will use.")
-
-       _ = dbtCmd.MarkFlagRequired("selectedModels")
        modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
        selectedModels := dbtCmd.Flags().StringSliceP("models", "m", 
modelsSlice, "dbt select models")
-
+       dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt 
run args")
        projectVars := make(map[string]string)
        projectVars["event_min_id"] = "7581"
        projectVars["event_max_id"] = "7582"
@@ -107,6 +101,8 @@ func main() {
                        "projectTarget":  *projectTarget,
                        "selectedModels": *selectedModels,
                        "projectVars":    projectVarsConvert,
+                       "projectGitURL":  *projectGitURL,
+                       "args":           dbtArgs,
                })
        }
        runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 963b0ff78..1498e3c06 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -20,7 +20,6 @@ package tasks
 import (
        "bufio"
        "encoding/json"
-       "github.com/apache/incubator-devlake/errors"
        "net"
        "net/url"
        "os"
@@ -28,6 +27,8 @@ import (
        "strconv"
        "strings"
 
+       "github.com/apache/incubator-devlake/errors"
+
        "github.com/apache/incubator-devlake/plugins/core"
        "github.com/spf13/viper"
 )
@@ -41,60 +42,66 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
        projectName := data.Options.ProjectName
        projectTarget := data.Options.ProjectTarget
        projectVars := data.Options.ProjectVars
-       dbUrl := taskCtx.GetConfig("DB_URL")
-       u, err := errors.Convert01(url.Parse(dbUrl))
+       args := data.Options.Args
+       err := errors.Convert(os.Chdir(projectPath))
        if err != nil {
                return err
        }
-       dbType := u.Scheme
-       dbUsername := u.User.Username()
-       dbPassword, _ := u.User.Password()
-       dbServer, dbPort, _ := net.SplitHostPort(u.Host)
-       dbDataBase := u.Path[1:]
-       var dbSchema string
-       flag := strings.Compare(dbType, "mysql")
-       if flag == 0 {
-               // mysql database
-               dbSchema = dbDataBase
-       } else {
-               // other database
-               mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+       _, err = errors.Convert01(os.Stat("profiles.yml"))
+       // if profiles.yml not exist, create it manually
+       if err != nil {
+               dbUrl := taskCtx.GetConfig("DB_URL")
+               u, err := errors.Convert01(url.Parse(dbUrl))
                if err != nil {
                        return err
                }
-               if value, ok := mapQuery["search_path"]; ok {
-                       if len(value) < 1 {
-                               return errors.Default.New("DB_URL search_path 
parses error")
+               dbType := u.Scheme
+               dbUsername := u.User.Username()
+               dbPassword, _ := u.User.Password()
+               dbServer, dbPort, _ := net.SplitHostPort(u.Host)
+               dbDataBase := u.Path[1:]
+               var dbSchema string
+               flag := strings.Compare(dbType, "mysql")
+               if flag == 0 {
+                       // mysql database
+                       dbSchema = dbDataBase
+               } else {
+                       // other database
+                       mapQuery, err := 
errors.Convert01(url.ParseQuery(u.RawQuery))
+                       if err != nil {
+                               return err
                        }
-                       dbSchema = value[0]
+                       if value, ok := mapQuery["search_path"]; ok {
+                               if len(value) < 1 {
+                                       return errors.Default.New("DB_URL 
search_path parses error")
+                               }
+                               dbSchema = value[0]
+                       } else {
+                               dbSchema = "public"
+                       }
+               }
+               config := viper.New()
+               config.Set(projectName+".target", projectTarget)
+               config.Set(projectName+".outputs."+projectTarget+".type", 
dbType)
+               dbPortInt, _ := strconv.Atoi(dbPort)
+               config.Set(projectName+".outputs."+projectTarget+".port", 
dbPortInt)
+               config.Set(projectName+".outputs."+projectTarget+".password", 
dbPassword)
+               config.Set(projectName+".outputs."+projectTarget+".schema", 
dbSchema)
+               if flag == 0 {
+                       
config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
+                       
config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
+                       
config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
                } else {
-                       dbSchema = "public"
+                       
config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
+                       
config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
+                       
config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
+               }
+               err = errors.Convert(config.WriteConfigAs("profiles.yml"))
+               if err != nil {
+                       return err
                }
        }
-       err = errors.Convert(os.Chdir(projectPath))
-       if err != nil {
-               return err
-       }
-       config := viper.New()
-       config.Set(projectName+".target", projectTarget)
-       config.Set(projectName+".outputs."+projectTarget+".type", dbType)
-       dbPortInt, _ := strconv.Atoi(dbPort)
-       config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
-       config.Set(projectName+".outputs."+projectTarget+".password", 
dbPassword)
-       config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
-       if flag == 0 {
-               config.Set(projectName+".outputs."+projectTarget+".server", 
dbServer)
-               config.Set(projectName+".outputs."+projectTarget+".username", 
dbUsername)
-               config.Set(projectName+".outputs."+projectTarget+".database", 
dbDataBase)
-       } else {
-               config.Set(projectName+".outputs."+projectTarget+".host", 
dbServer)
-               config.Set(projectName+".outputs."+projectTarget+".user", 
dbUsername)
-               config.Set(projectName+".outputs."+projectTarget+".dbname", 
dbDataBase)
-       }
-       err = errors.Convert(config.WriteConfigAs("profiles.yml"))
-       if err != nil {
-               return err
-       }
+       // if package.yml exist, install dbt dependencies
        _, err = errors.Convert01(os.Stat("packages.yml"))
        if err == nil {
                cmd := exec.Command("dbt", "deps")
@@ -114,6 +121,9 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error 
{
        }
        dbtExecParams = append(dbtExecParams, "--select")
        dbtExecParams = append(dbtExecParams, models...)
+       if args != nil {
+               dbtExecParams = append(dbtExecParams, args...)
+       }
        cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
        log.Info("dbt run script: ", cmd)
        stdout, _ := cmd.StdoutPipe()
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/git.go
similarity index 50%
copy from plugins/dbt/tasks/task_data.go
copy to plugins/dbt/tasks/git.go
index 7733d8364..2c481e39a 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/git.go
@@ -17,15 +17,33 @@ limitations under the License.
 
 package tasks
 
-type DbtOptions struct {
-       ProjectPath    string                 `json:"projectPath"`
-       ProjectName    string                 `json:"projectName"`
-       ProjectTarget  string                 `json:"projectTarget"`
-       ProjectVars    map[string]interface{} `json:"projectVars"`
-       SelectedModels []string               `json:"selectedModels"`
-       Tasks          []string               `json:"tasks,omitempty"`
+import (
+       "os/exec"
+
+       "github.com/apache/incubator-devlake/errors"
+       "github.com/apache/incubator-devlake/plugins/core"
+)
+
+func Git(taskCtx core.SubTaskContext) errors.Error {
+       logger := taskCtx.GetLogger()
+       data := taskCtx.GetData().(*DbtTaskData)
+       if data.Options.ProjectGitURL == "" {
+               return nil
+       }
+       cmd := exec.Command("git", "clone", data.Options.ProjectGitURL)
+       logger.Info("start clone dbt project: %v", cmd)
+       out, err := cmd.CombinedOutput()
+       if err != nil {
+               logger.Error(err, "clone dbt project failed")
+               return errors.Convert(err)
+       }
+       logger.Info("clone dbt project success: %v", string(out))
+       return nil
 }
 
-type DbtTaskData struct {
-       Options *DbtOptions
+var GitMeta = core.SubTaskMeta{
+       Name:             "Git",
+       EntryPoint:       Git,
+       EnabledByDefault: true,
+       Description:      "Clone dbt project from git",
 }
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7733d8364..7bb20a702 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -18,12 +18,17 @@ limitations under the License.
 package tasks
 
 type DbtOptions struct {
-       ProjectPath    string                 `json:"projectPath"`
-       ProjectName    string                 `json:"projectName"`
-       ProjectTarget  string                 `json:"projectTarget"`
+       ProjectPath   string `json:"projectPath"`
+       ProjectName   string `json:"projectName"`
+       ProjectTarget string `json:"projectTarget"`
+       // clone from git to projectPath if projectGitURL is not empty
+       ProjectGitURL string `json:"projectGitURL"`
+       // deprecated, use args instead
        ProjectVars    map[string]interface{} `json:"projectVars"`
        SelectedModels []string               `json:"selectedModels"`
-       Tasks          []string               `json:"tasks,omitempty"`
+       // dbt run args
+       Args  []string `json:"args"`
+       Tasks []string `json:"tasks,omitempty"`
 }
 
 type DbtTaskData struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index b6f231216..930bc7750 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -82,6 +82,7 @@ func main() {
        batchSize := cmd.Flags().StringP("batch_size", "b", "", "StarRocks 
insert batch size")
        _ = cmd.MarkFlagRequired("batch_size")
        extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table 
sql extra")
+       orderBy := cmd.Flags().StringP("order_by", "o", "", "Source tables 
order by, default is primary key")
        cmd.Run = func(cmd *cobra.Command, args []string) {
                runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
                        "source_type": sourceType,
@@ -96,6 +97,7 @@ func main() {
                        "tables":      tables,
                        "batch_size":  batchSize,
                        "extra":       extra,
+                       "order_by":    orderBy,
                })
        }
        runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 49e24e7d6..35f555442 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -28,7 +28,8 @@ type StarRocksConfig struct {
        BeHost      string `mapstructure:"be_host"`
        BePort      int    `mapstructure:"be_port"`
        Tables      []string
-       BatchSize   int    `mapstructure:"batch_size"`
-       DomainLayer string `mapstructure:"domain_layer"`
-       Extra       string
+       BatchSize   int               `mapstructure:"batch_size"`
+       OrderBy     map[string]string `mapstructure:"order_by"`
+       DomainLayer string            `mapstructure:"domain_layer"`
+       Extra       map[string]string
 }
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index a7d38b4f1..8b21352b0 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -106,20 +106,39 @@ func LoadData(c core.SubTaskContext) errors.Error {
        for _, table := range starrocksTables {
                starrocksTable := strings.TrimLeft(table, "_")
                var columnMap map[string]string
-               columnMap, err = createTable(starrocks, db, starrocksTable, 
table, c, config.Extra)
+               var orderBy string
+               columnMap, orderBy, err = createTable(starrocks, db, 
starrocksTable, table, c, config)
                if err != nil {
                        c.GetLogger().Error(err, "create table %s in starrocks 
error", table)
                        return errors.Convert(err)
                }
-               err = loadData(starrocks, c, starrocksTable, table, columnMap, 
db, config)
+               // try postgre syntax, because we can't get dialect here
+               err := errors.Convert(db.Exec("begin transaction isolation 
level repeatable read"))
+               if err != nil {
+                       // try mysql syntax
+                       err = errors.Convert(db.Exec("set session transaction 
isolation level repeatable read"))
+                       if err != nil {
+                               return err
+                       }
+                       err = errors.Convert(db.Exec("start transaction"))
+                       if err != nil {
+                               return err
+                       }
+
+               }
+               err = errors.Convert(loadData(starrocks, c, starrocksTable, 
table, columnMap, db, config, orderBy))
+               if err != nil {
+                       return errors.Convert(err)
+               }
+               err = errors.Convert(db.Exec("commit"))
                if err != nil {
-                       c.GetLogger().Error(err, "load data %s error", table)
                        return errors.Convert(err)
                }
        }
        return nil
 }
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) {
+
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table 
string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string, 
string, errors.Error) {
        columeMetas, err := db.GetColumns(&Table{name: table}, nil)
        columnMap := make(map[string]string)
        if err != nil {
@@ -127,20 +146,22 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
                        c.GetLogger().Warn(err, "skip err: cached plan must not 
change result type")
                        columeMetas, err = db.GetColumns(&Table{name: table}, 
nil)
                        if err != nil {
-                               return nil, errors.Convert(err)
+                               return nil, "", errors.Convert(err)
                        }
                } else {
-                       return nil, errors.Convert(err)
+                       return nil, "", errors.Convert(err)
                }
        }
        var pks []string
+       var orders []string
        var columns []string
        firstcm := ""
+       firstcmName := ""
        for _, cm := range columeMetas {
                name := cm.Name()
                starrocksDatatype, ok := cm.ColumnType()
                if !ok {
-                       return columnMap, errors.Default.New(fmt.Sprintf("Get 
[%s] ColumeType Failed", name))
+                       return columnMap, "", 
errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
                }
                dataType := getDataType(starrocksDatatype)
                columnMap[name] = dataType
@@ -149,26 +170,39 @@ func createTable(starrocks *sql.DB, db dal.Dal, 
starrocksTable string, table str
                isPrimaryKey, ok := cm.PrimaryKey()
                if isPrimaryKey && ok {
                        pks = append(pks, fmt.Sprintf("`%s`", name))
+                       orders = append(orders, name)
                }
                if firstcm == "" {
                        firstcm = fmt.Sprintf("`%s`", name)
+                       firstcmName = name
                }
        }
 
        if len(pks) == 0 {
                pks = append(pks, firstcm)
        }
-
-       if extra == "" {
-               extra = fmt.Sprintf(`engine=olap distributed by hash(%s) 
properties("replication_num" = "1")`, strings.Join(pks, ", "))
+       orderBy := strings.Join(orders, ",")
+       if config.OrderBy != nil {
+               if v, ok := config.OrderBy[table]; ok {
+                       orderBy = v
+               }
+       }
+       if orderBy == "" {
+               orderBy = firstcmName
+       }
+       extra := fmt.Sprintf(`engine=olap distributed by hash(%s) 
properties("replication_num" = "1")`, strings.Join(pks, ", "))
+       if config.Extra != nil {
+               if v, ok := config.Extra[table]; ok {
+                       extra = v
+               }
        }
        tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s", 
starrocksTable, strings.Join(columns, ","), extra)
        c.GetLogger().Info(tableSql)
        _, err = errors.Convert01(starrocks.Exec(tableSql))
-       return columnMap, err
+       return columnMap, orderBy, err
 }
 
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig) 
error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string, 
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig, 
orderBy string) error {
        offset := 0
        starrocksTmpTable := starrocksTable + "_tmp"
        // create tmp table in starrocks
@@ -181,7 +215,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
                var rows *sql.Rows
                var data []map[string]interface{}
                // select data from db
-               rows, err = db.RawCursor(fmt.Sprintf("select * from %s limit %d 
offset %d", table, config.BatchSize, offset))
+               rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by 
%s limit %d offset %d", table, orderBy, config.BatchSize, offset))
                if err != nil {
                        return err
                }
@@ -298,11 +332,9 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext, 
starrocksTable string, t
        return nil
 }
 
-var (
-       LoadDataTaskMeta = core.SubTaskMeta{
-               Name:             "LoadData",
-               EntryPoint:       LoadData,
-               EnabledByDefault: true,
-               Description:      "Load data to StarRocks",
-       }
-)
+var LoadDataTaskMeta = core.SubTaskMeta{
+       Name:             "LoadData",
+       EntryPoint:       LoadData,
+       EnabledByDefault: true,
+       Description:      "Load data to StarRocks",
+}

Reply via email to