This is an automated email from the ASF dual-hosted git repository.
abeizn pushed a commit to branch release-v0.14
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/release-v0.14 by this push:
new d0e43565a feat: cherrypick dbt and starrocks plugin improvement (#3523)
d0e43565a is described below
commit d0e43565ad8105e7cfbe1d9d9bb419db0444809f
Author: long2ice <[email protected]>
AuthorDate: Fri Oct 21 10:29:02 2022 +0800
feat: cherrypick dbt and starrocks plugin improvement (#3523)
* feat: dbt plugin improvement
* feat: add order by for starrocks plugin and use transaction repeatable
read when load data
---
config-ui/src/data/pipeline-config-samples/dbt.js | 4 +-
.../src/data/pipeline-config-samples/starrocks.js | 3 +-
plugins/dbt/api/swagger.go | 42 ++++-----
plugins/dbt/dbt.go | 24 +++--
plugins/dbt/tasks/convertor.go | 100 +++++++++++----------
plugins/dbt/tasks/{task_data.go => git.go} | 36 ++++++--
plugins/dbt/tasks/task_data.go | 13 ++-
plugins/starrocks/starrocks.go | 2 +
plugins/starrocks/task_data.go | 7 +-
plugins/starrocks/tasks.go | 74 ++++++++++-----
10 files changed, 182 insertions(+), 123 deletions(-)
diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js
b/config-ui/src/data/pipeline-config-samples/dbt.js
index 292a46d25..fac96b365 100644
--- a/config-ui/src/data/pipeline-config-samples/dbt.js
+++ b/config-ui/src/data/pipeline-config-samples/dbt.js
@@ -21,13 +21,15 @@ const dbtConfig = [
plugin: 'dbt',
options: {
projectPath: '/var/www/html/my-project',
+ projectGitURL: '',
projectName: 'myproject',
projectTarget: 'dev',
selectedModels: ['model_one', 'model_two'],
projectVars: {
demokey1: 'demovalue1',
demokey2: 'demovalue2'
- }
+ },
+ args: []
}
}
]
diff --git a/config-ui/src/data/pipeline-config-samples/starrocks.js
b/config-ui/src/data/pipeline-config-samples/starrocks.js
index 150e0efce..a49321063 100644
--- a/config-ui/src/data/pipeline-config-samples/starrocks.js
+++ b/config-ui/src/data/pipeline-config-samples/starrocks.js
@@ -31,7 +31,8 @@ const starRocksConfig = [
be_port: 8040,
tables: ['_tool_.*'], // support regexp
batch_size: 10000,
- extra: '', // will append to create table sql
+ order_by: {},
+ extra: {}, // will append to create table sql
domain_layer: '' // priority over tables
}
}
diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go
index 841c2870a..a14c986cc 100644
--- a/plugins/dbt/api/swagger.go
+++ b/plugins/dbt/api/swagger.go
@@ -25,19 +25,23 @@ package api
// @Router /blueprints/dbt/blueprint-plan [post]
func _() {}
-type DbtBlueprintPlan [][]struct {
- Plugin string `json:"plugin"`
- Options struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
- SelectedModels []string `json:"selectedModels"`
- ProjectVars struct {
- Demokey1 string `json:"demokey1"`
- Demokey2 string `json:"demokey2"`
- } `json:"projectVars"`
- } `json:"options"`
+type Options struct {
+ ProjectPath string `json:"projectPath"`
+ ProjectGitURL string `json:"projectGitURL"`
+ ProjectName string `json:"projectName"`
+ ProjectTarget string `json:"projectTarget"`
+ SelectedModels []string `json:"selectedModels"`
+ Args []string `json:"args"`
+ ProjectVars struct {
+ Demokey1 string `json:"demokey1"`
+ Demokey2 string `json:"demokey2"`
+ } `json:"projectVars"`
}
+type Plan struct {
+ Plugin string `json:"plugin"`
+ Options Options `json:"options"`
+}
+type DbtBlueprintPlan [][]Plan
// @Summary pipelines plan for dbt
// @Description pipelines plan for dbt
@@ -47,16 +51,4 @@ type DbtBlueprintPlan [][]struct {
// @Router /pipelines/dbt/pipeline-plan [post]
func _() {}
-type DbtPipelinePlan [][]struct {
- Plugin string `json:"plugin"`
- Options struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
- SelectedModels []string `json:"selectedModels"`
- ProjectVars struct {
- Demokey1 string `json:"demokey1"`
- Demokey2 string `json:"demokey2"`
- } `json:"projectVars"`
- } `json:"options"`
-}
+type DbtPipelinePlan [][]Plan
diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go
index f336cb977..1a3c19cc0 100644
--- a/plugins/dbt/dbt.go
+++ b/plugins/dbt/dbt.go
@@ -27,8 +27,10 @@ import (
"github.com/spf13/cobra"
)
-var _ core.PluginMeta = (*Dbt)(nil)
-var _ core.PluginTask = (*Dbt)(nil)
+var (
+ _ core.PluginMeta = (*Dbt)(nil)
+ _ core.PluginTask = (*Dbt)(nil)
+)
type Dbt struct{}
@@ -38,6 +40,7 @@ func (plugin Dbt) Description() string {
func (plugin Dbt) SubTaskMetas() []core.SubTaskMeta {
return []core.SubTaskMeta{
+ tasks.GitMeta,
tasks.DbtConverterMeta,
}
}
@@ -55,15 +58,10 @@ func (plugin Dbt) PrepareTaskData(taskCtx core.TaskContext,
options map[string]i
if op.ProjectPath == "" {
return nil, errors.Default.New("projectPath is required for dbt
plugin")
}
- if op.ProjectName == "" {
- return nil, errors.Default.New("projectName is required for dbt
plugin")
- }
+
if op.ProjectTarget == "" {
op.ProjectTarget = "dev"
}
- if op.SelectedModels == nil {
- return nil, errors.Default.New("selectedModels is required for
dbt plugin")
- }
return &tasks.DbtTaskData{
Options: &op,
@@ -81,16 +79,12 @@ func main() {
dbtCmd := &cobra.Command{Use: "dbt"}
_ = dbtCmd.MarkFlagRequired("projectPath")
projectPath := dbtCmd.Flags().StringP("projectPath", "p",
"/Users/abeizn/demoapp", "user dbt project directory.")
-
- _ = dbtCmd.MarkFlagRequired("projectName")
+ projectGitURL := dbtCmd.Flags().StringP("projectGitURL", "g", "", "user
dbt project git url.")
projectName := dbtCmd.Flags().StringP("projectName", "n", "demoapp",
"user dbt project name.")
-
projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev",
"this is the default target your dbt project will use.")
-
- _ = dbtCmd.MarkFlagRequired("selectedModels")
modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"}
selectedModels := dbtCmd.Flags().StringSliceP("models", "m",
modelsSlice, "dbt select models")
-
+ dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt
run args")
projectVars := make(map[string]string)
projectVars["event_min_id"] = "7581"
projectVars["event_max_id"] = "7582"
@@ -107,6 +101,8 @@ func main() {
"projectTarget": *projectTarget,
"selectedModels": *selectedModels,
"projectVars": projectVarsConvert,
+ "projectGitURL": *projectGitURL,
+ "args": dbtArgs,
})
}
runner.RunCmd(dbtCmd)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 963b0ff78..1498e3c06 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -20,7 +20,6 @@ package tasks
import (
"bufio"
"encoding/json"
- "github.com/apache/incubator-devlake/errors"
"net"
"net/url"
"os"
@@ -28,6 +27,8 @@ import (
"strconv"
"strings"
+ "github.com/apache/incubator-devlake/errors"
+
"github.com/apache/incubator-devlake/plugins/core"
"github.com/spf13/viper"
)
@@ -41,60 +42,66 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error
{
projectName := data.Options.ProjectName
projectTarget := data.Options.ProjectTarget
projectVars := data.Options.ProjectVars
- dbUrl := taskCtx.GetConfig("DB_URL")
- u, err := errors.Convert01(url.Parse(dbUrl))
+ args := data.Options.Args
+ err := errors.Convert(os.Chdir(projectPath))
if err != nil {
return err
}
- dbType := u.Scheme
- dbUsername := u.User.Username()
- dbPassword, _ := u.User.Password()
- dbServer, dbPort, _ := net.SplitHostPort(u.Host)
- dbDataBase := u.Path[1:]
- var dbSchema string
- flag := strings.Compare(dbType, "mysql")
- if flag == 0 {
- // mysql database
- dbSchema = dbDataBase
- } else {
- // other database
- mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery))
+ _, err = errors.Convert01(os.Stat("profiles.yml"))
+ // if profiles.yml not exist, create it manually
+ if err != nil {
+ dbUrl := taskCtx.GetConfig("DB_URL")
+ u, err := errors.Convert01(url.Parse(dbUrl))
if err != nil {
return err
}
- if value, ok := mapQuery["search_path"]; ok {
- if len(value) < 1 {
- return errors.Default.New("DB_URL search_path
parses error")
+ dbType := u.Scheme
+ dbUsername := u.User.Username()
+ dbPassword, _ := u.User.Password()
+ dbServer, dbPort, _ := net.SplitHostPort(u.Host)
+ dbDataBase := u.Path[1:]
+ var dbSchema string
+ flag := strings.Compare(dbType, "mysql")
+ if flag == 0 {
+ // mysql database
+ dbSchema = dbDataBase
+ } else {
+ // other database
+ mapQuery, err :=
errors.Convert01(url.ParseQuery(u.RawQuery))
+ if err != nil {
+ return err
}
- dbSchema = value[0]
+ if value, ok := mapQuery["search_path"]; ok {
+ if len(value) < 1 {
+ return errors.Default.New("DB_URL
search_path parses error")
+ }
+ dbSchema = value[0]
+ } else {
+ dbSchema = "public"
+ }
+ }
+ config := viper.New()
+ config.Set(projectName+".target", projectTarget)
+ config.Set(projectName+".outputs."+projectTarget+".type",
dbType)
+ dbPortInt, _ := strconv.Atoi(dbPort)
+ config.Set(projectName+".outputs."+projectTarget+".port",
dbPortInt)
+ config.Set(projectName+".outputs."+projectTarget+".password",
dbPassword)
+ config.Set(projectName+".outputs."+projectTarget+".schema",
dbSchema)
+ if flag == 0 {
+
config.Set(projectName+".outputs."+projectTarget+".server", dbServer)
+
config.Set(projectName+".outputs."+projectTarget+".username", dbUsername)
+
config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase)
} else {
- dbSchema = "public"
+
config.Set(projectName+".outputs."+projectTarget+".host", dbServer)
+
config.Set(projectName+".outputs."+projectTarget+".user", dbUsername)
+
config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase)
+ }
+ err = errors.Convert(config.WriteConfigAs("profiles.yml"))
+ if err != nil {
+ return err
}
}
- err = errors.Convert(os.Chdir(projectPath))
- if err != nil {
- return err
- }
- config := viper.New()
- config.Set(projectName+".target", projectTarget)
- config.Set(projectName+".outputs."+projectTarget+".type", dbType)
- dbPortInt, _ := strconv.Atoi(dbPort)
- config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt)
- config.Set(projectName+".outputs."+projectTarget+".password",
dbPassword)
- config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema)
- if flag == 0 {
- config.Set(projectName+".outputs."+projectTarget+".server",
dbServer)
- config.Set(projectName+".outputs."+projectTarget+".username",
dbUsername)
- config.Set(projectName+".outputs."+projectTarget+".database",
dbDataBase)
- } else {
- config.Set(projectName+".outputs."+projectTarget+".host",
dbServer)
- config.Set(projectName+".outputs."+projectTarget+".user",
dbUsername)
- config.Set(projectName+".outputs."+projectTarget+".dbname",
dbDataBase)
- }
- err = errors.Convert(config.WriteConfigAs("profiles.yml"))
- if err != nil {
- return err
- }
+ // if package.yml exist, install dbt dependencies
_, err = errors.Convert01(os.Stat("packages.yml"))
if err == nil {
cmd := exec.Command("dbt", "deps")
@@ -114,6 +121,9 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error
{
}
dbtExecParams = append(dbtExecParams, "--select")
dbtExecParams = append(dbtExecParams, models...)
+ if args != nil {
+ dbtExecParams = append(dbtExecParams, args...)
+ }
cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
log.Info("dbt run script: ", cmd)
stdout, _ := cmd.StdoutPipe()
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/git.go
similarity index 50%
copy from plugins/dbt/tasks/task_data.go
copy to plugins/dbt/tasks/git.go
index 7733d8364..2c481e39a 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/git.go
@@ -17,15 +17,33 @@ limitations under the License.
package tasks
-type DbtOptions struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
- ProjectVars map[string]interface{} `json:"projectVars"`
- SelectedModels []string `json:"selectedModels"`
- Tasks []string `json:"tasks,omitempty"`
+import (
+ "os/exec"
+
+ "github.com/apache/incubator-devlake/errors"
+ "github.com/apache/incubator-devlake/plugins/core"
+)
+
+func Git(taskCtx core.SubTaskContext) errors.Error {
+ logger := taskCtx.GetLogger()
+ data := taskCtx.GetData().(*DbtTaskData)
+ if data.Options.ProjectGitURL == "" {
+ return nil
+ }
+ cmd := exec.Command("git", "clone", data.Options.ProjectGitURL)
+ logger.Info("start clone dbt project: %v", cmd)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ logger.Error(err, "clone dbt project failed")
+ return errors.Convert(err)
+ }
+ logger.Info("clone dbt project success: %v", string(out))
+ return nil
}
-type DbtTaskData struct {
- Options *DbtOptions
+var GitMeta = core.SubTaskMeta{
+ Name: "Git",
+ EntryPoint: Git,
+ EnabledByDefault: true,
+ Description: "Clone dbt project from git",
}
diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go
index 7733d8364..7bb20a702 100644
--- a/plugins/dbt/tasks/task_data.go
+++ b/plugins/dbt/tasks/task_data.go
@@ -18,12 +18,17 @@ limitations under the License.
package tasks
type DbtOptions struct {
- ProjectPath string `json:"projectPath"`
- ProjectName string `json:"projectName"`
- ProjectTarget string `json:"projectTarget"`
+ ProjectPath string `json:"projectPath"`
+ ProjectName string `json:"projectName"`
+ ProjectTarget string `json:"projectTarget"`
+ // clone from git to projectPath if projectGitURL is not empty
+ ProjectGitURL string `json:"projectGitURL"`
+ // deprecated, use args instead
ProjectVars map[string]interface{} `json:"projectVars"`
SelectedModels []string `json:"selectedModels"`
- Tasks []string `json:"tasks,omitempty"`
+ // dbt run args
+ Args []string `json:"args"`
+ Tasks []string `json:"tasks,omitempty"`
}
type DbtTaskData struct {
diff --git a/plugins/starrocks/starrocks.go b/plugins/starrocks/starrocks.go
index b6f231216..930bc7750 100644
--- a/plugins/starrocks/starrocks.go
+++ b/plugins/starrocks/starrocks.go
@@ -82,6 +82,7 @@ func main() {
batchSize := cmd.Flags().StringP("batch_size", "b", "", "StarRocks
insert batch size")
_ = cmd.MarkFlagRequired("batch_size")
extra := cmd.Flags().StringP("extra", "e", "", "StarRocks create table
sql extra")
+ orderBy := cmd.Flags().StringP("order_by", "o", "", "Source tables
order by, default is primary key")
cmd.Run = func(cmd *cobra.Command, args []string) {
runner.DirectRun(cmd, args, PluginEntry, map[string]interface{}{
"source_type": sourceType,
@@ -96,6 +97,7 @@ func main() {
"tables": tables,
"batch_size": batchSize,
"extra": extra,
+ "order_by": orderBy,
})
}
runner.RunCmd(cmd)
diff --git a/plugins/starrocks/task_data.go b/plugins/starrocks/task_data.go
index 49e24e7d6..35f555442 100644
--- a/plugins/starrocks/task_data.go
+++ b/plugins/starrocks/task_data.go
@@ -28,7 +28,8 @@ type StarRocksConfig struct {
BeHost string `mapstructure:"be_host"`
BePort int `mapstructure:"be_port"`
Tables []string
- BatchSize int `mapstructure:"batch_size"`
- DomainLayer string `mapstructure:"domain_layer"`
- Extra string
+ BatchSize int `mapstructure:"batch_size"`
+ OrderBy map[string]string `mapstructure:"order_by"`
+ DomainLayer string `mapstructure:"domain_layer"`
+ Extra map[string]string
}
diff --git a/plugins/starrocks/tasks.go b/plugins/starrocks/tasks.go
index a7d38b4f1..8b21352b0 100644
--- a/plugins/starrocks/tasks.go
+++ b/plugins/starrocks/tasks.go
@@ -106,20 +106,39 @@ func LoadData(c core.SubTaskContext) errors.Error {
for _, table := range starrocksTables {
starrocksTable := strings.TrimLeft(table, "_")
var columnMap map[string]string
- columnMap, err = createTable(starrocks, db, starrocksTable,
table, c, config.Extra)
+ var orderBy string
+ columnMap, orderBy, err = createTable(starrocks, db,
starrocksTable, table, c, config)
if err != nil {
c.GetLogger().Error(err, "create table %s in starrocks
error", table)
return errors.Convert(err)
}
- err = loadData(starrocks, c, starrocksTable, table, columnMap,
db, config)
+ // try postgre syntax, because we can't get dialect here
+ err := errors.Convert(db.Exec("begin transaction isolation
level repeatable read"))
+ if err != nil {
+ // try mysql syntax
+ err = errors.Convert(db.Exec("set session transaction
isolation level repeatable read"))
+ if err != nil {
+ return err
+ }
+ err = errors.Convert(db.Exec("start transaction"))
+ if err != nil {
+ return err
+ }
+
+ }
+ err = errors.Convert(loadData(starrocks, c, starrocksTable,
table, columnMap, db, config, orderBy))
+ if err != nil {
+ return errors.Convert(err)
+ }
+ err = errors.Convert(db.Exec("commit"))
if err != nil {
- c.GetLogger().Error(err, "load data %s error", table)
return errors.Convert(err)
}
}
return nil
}
-func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, extra string) (map[string]string, errors.Error) {
+
+func createTable(starrocks *sql.DB, db dal.Dal, starrocksTable string, table
string, c core.SubTaskContext, config *StarRocksConfig) (map[string]string,
string, errors.Error) {
columeMetas, err := db.GetColumns(&Table{name: table}, nil)
columnMap := make(map[string]string)
if err != nil {
@@ -127,20 +146,22 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
c.GetLogger().Warn(err, "skip err: cached plan must not
change result type")
columeMetas, err = db.GetColumns(&Table{name: table},
nil)
if err != nil {
- return nil, errors.Convert(err)
+ return nil, "", errors.Convert(err)
}
} else {
- return nil, errors.Convert(err)
+ return nil, "", errors.Convert(err)
}
}
var pks []string
+ var orders []string
var columns []string
firstcm := ""
+ firstcmName := ""
for _, cm := range columeMetas {
name := cm.Name()
starrocksDatatype, ok := cm.ColumnType()
if !ok {
- return columnMap, errors.Default.New(fmt.Sprintf("Get
[%s] ColumeType Failed", name))
+ return columnMap, "",
errors.Default.New(fmt.Sprintf("Get [%s] ColumeType Failed", name))
}
dataType := getDataType(starrocksDatatype)
columnMap[name] = dataType
@@ -149,26 +170,39 @@ func createTable(starrocks *sql.DB, db dal.Dal,
starrocksTable string, table str
isPrimaryKey, ok := cm.PrimaryKey()
if isPrimaryKey && ok {
pks = append(pks, fmt.Sprintf("`%s`", name))
+ orders = append(orders, name)
}
if firstcm == "" {
firstcm = fmt.Sprintf("`%s`", name)
+ firstcmName = name
}
}
if len(pks) == 0 {
pks = append(pks, firstcm)
}
-
- if extra == "" {
- extra = fmt.Sprintf(`engine=olap distributed by hash(%s)
properties("replication_num" = "1")`, strings.Join(pks, ", "))
+ orderBy := strings.Join(orders, ",")
+ if config.OrderBy != nil {
+ if v, ok := config.OrderBy[table]; ok {
+ orderBy = v
+ }
+ }
+ if orderBy == "" {
+ orderBy = firstcmName
+ }
+ extra := fmt.Sprintf(`engine=olap distributed by hash(%s)
properties("replication_num" = "1")`, strings.Join(pks, ", "))
+ if config.Extra != nil {
+ if v, ok := config.Extra[table]; ok {
+ extra = v
+ }
}
tableSql := fmt.Sprintf("create table if not exists `%s` ( %s ) %s",
starrocksTable, strings.Join(columns, ","), extra)
c.GetLogger().Info(tableSql)
_, err = errors.Convert01(starrocks.Exec(tableSql))
- return columnMap, err
+ return columnMap, orderBy, err
}
-func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string,
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig)
error {
+func loadData(starrocks *sql.DB, c core.SubTaskContext, starrocksTable string,
table string, columnMap map[string]string, db dal.Dal, config *StarRocksConfig,
orderBy string) error {
offset := 0
starrocksTmpTable := starrocksTable + "_tmp"
// create tmp table in starrocks
@@ -181,7 +215,7 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
var rows *sql.Rows
var data []map[string]interface{}
// select data from db
- rows, err = db.RawCursor(fmt.Sprintf("select * from %s limit %d
offset %d", table, config.BatchSize, offset))
+ rows, err = db.RawCursor(fmt.Sprintf("select * from %s order by
%s limit %d offset %d", table, orderBy, config.BatchSize, offset))
if err != nil {
return err
}
@@ -298,11 +332,9 @@ func loadData(starrocks *sql.DB, c core.SubTaskContext,
starrocksTable string, t
return nil
}
-var (
- LoadDataTaskMeta = core.SubTaskMeta{
- Name: "LoadData",
- EntryPoint: LoadData,
- EnabledByDefault: true,
- Description: "Load data to StarRocks",
- }
-)
+var LoadDataTaskMeta = core.SubTaskMeta{
+ Name: "LoadData",
+ EntryPoint: LoadData,
+ EnabledByDefault: true,
+ Description: "Load data to StarRocks",
+}