This is an automated email from the ASF dual-hosted git repository. abeizn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
commit add88b5747fe81c3cf991317503f174e5ed6221e Author: long2ice <[email protected]> AuthorDate: Wed Oct 19 12:02:57 2022 +0800 feat: dbt plugin improvement --- config-ui/src/data/pipeline-config-samples/dbt.js | 4 +- plugins/dbt/api/swagger.go | 42 ++++----- plugins/dbt/dbt.go | 17 ++-- plugins/dbt/tasks/convertor.go | 100 ++++++++++++---------- plugins/dbt/tasks/git.go | 48 +++++++++++ plugins/dbt/tasks/task_data.go | 13 ++- 6 files changed, 141 insertions(+), 83 deletions(-) diff --git a/config-ui/src/data/pipeline-config-samples/dbt.js b/config-ui/src/data/pipeline-config-samples/dbt.js index 292a46d2..fac96b36 100644 --- a/config-ui/src/data/pipeline-config-samples/dbt.js +++ b/config-ui/src/data/pipeline-config-samples/dbt.js @@ -21,13 +21,15 @@ const dbtConfig = [ plugin: 'dbt', options: { projectPath: '/var/www/html/my-project', + projectGitURL: '', projectName: 'myproject', projectTarget: 'dev', selectedModels: ['model_one', 'model_two'], projectVars: { demokey1: 'demovalue1', demokey2: 'demovalue2' - } + }, + args: [] } } ] diff --git a/plugins/dbt/api/swagger.go b/plugins/dbt/api/swagger.go index 841c2870..a14c986c 100644 --- a/plugins/dbt/api/swagger.go +++ b/plugins/dbt/api/swagger.go @@ -25,19 +25,23 @@ package api // @Router /blueprints/dbt/blueprint-plan [post] func _() {} -type DbtBlueprintPlan [][]struct { - Plugin string `json:"plugin"` - Options struct { - ProjectPath string `json:"projectPath"` - ProjectName string `json:"projectName"` - ProjectTarget string `json:"projectTarget"` - SelectedModels []string `json:"selectedModels"` - ProjectVars struct { - Demokey1 string `json:"demokey1"` - Demokey2 string `json:"demokey2"` - } `json:"projectVars"` - } `json:"options"` +type Options struct { + ProjectPath string `json:"projectPath"` + ProjectGitURL string `json:"projectGitURL"` + ProjectName string `json:"projectName"` + ProjectTarget string `json:"projectTarget"` + SelectedModels []string `json:"selectedModels"` + Args []string `json:"args"` + ProjectVars struct { + Demokey1 string `json:"demokey1"` + Demokey2 string `json:"demokey2"` + } `json:"projectVars"` } +type Plan struct { + Plugin string `json:"plugin"` + Options Options `json:"options"` +} +type DbtBlueprintPlan [][]Plan // @Summary pipelines plan for dbt // @Description pipelines plan for dbt @@ -47,16 +51,4 @@ type DbtBlueprintPlan [][]struct { // @Router /pipelines/dbt/pipeline-plan [post] func _() {} -type DbtPipelinePlan [][]struct { - Plugin string `json:"plugin"` - Options struct { - ProjectPath string `json:"projectPath"` - ProjectName string `json:"projectName"` - ProjectTarget string `json:"projectTarget"` - SelectedModels []string `json:"selectedModels"` - ProjectVars struct { - Demokey1 string `json:"demokey1"` - Demokey2 string `json:"demokey2"` - } `json:"projectVars"` - } `json:"options"` -} +type DbtPipelinePlan [][]Plan diff --git a/plugins/dbt/dbt.go b/plugins/dbt/dbt.go index f336cb97..6f95c2c9 100644 --- a/plugins/dbt/dbt.go +++ b/plugins/dbt/dbt.go @@ -27,8 +27,10 @@ import ( "github.com/spf13/cobra" ) -var _ core.PluginMeta = (*Dbt)(nil) -var _ core.PluginTask = (*Dbt)(nil) +var ( + _ core.PluginMeta = (*Dbt)(nil) + _ core.PluginTask = (*Dbt)(nil) +) type Dbt struct{} @@ -38,6 +40,7 @@ func (plugin Dbt) Description() string { func (plugin Dbt) SubTaskMetas() []core.SubTaskMeta { return []core.SubTaskMeta{ + tasks.GitMeta, tasks.DbtConverterMeta, } } @@ -81,16 +84,12 @@ func main() { dbtCmd := &cobra.Command{Use: "dbt"} _ = dbtCmd.MarkFlagRequired("projectPath") projectPath := dbtCmd.Flags().StringP("projectPath", "p", "/Users/abeizn/demoapp", "user dbt project directory.") - - _ = dbtCmd.MarkFlagRequired("projectName") + projectGitURL := dbtCmd.Flags().StringP("projectGitURL", "g", "", "user dbt project git url.") projectName := dbtCmd.Flags().StringP("projectName", "n", "demoapp", "user dbt project name.") - projectTarget := dbtCmd.Flags().StringP("projectTarget", "o", "dev", "this is the default target your dbt project will use.") - - _ = dbtCmd.MarkFlagRequired("selectedModels") modelsSlice := []string{"my_first_dbt_model", "my_second_dbt_model"} selectedModels := dbtCmd.Flags().StringSliceP("models", "m", modelsSlice, "dbt select models") - + dbtArgs := dbtCmd.Flags().StringSliceP("args", "a", []string{}, "dbt run args") projectVars := make(map[string]string) projectVars["event_min_id"] = "7581" projectVars["event_max_id"] = "7582" @@ -107,6 +106,8 @@ func main() { "projectTarget": *projectTarget, "selectedModels": *selectedModels, "projectVars": projectVarsConvert, + "projectGitURL": *projectGitURL, + "args": dbtArgs, }) } runner.RunCmd(dbtCmd) diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go index a0ca1004..265fd5c1 100644 --- a/plugins/dbt/tasks/convertor.go +++ b/plugins/dbt/tasks/convertor.go @@ -20,7 +20,6 @@ package tasks import ( "bufio" "encoding/json" - "github.com/apache/incubator-devlake/errors" "net" "net/url" "os" @@ -28,6 +27,8 @@ import ( "strconv" "strings" + "github.com/apache/incubator-devlake/errors" + "github.com/apache/incubator-devlake/plugins/core" "github.com/spf13/viper" ) @@ -41,60 +42,66 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error { projectName := data.Options.ProjectName projectTarget := data.Options.ProjectTarget projectVars := data.Options.ProjectVars - dbUrl := taskCtx.GetConfig("DB_URL") - u, err := errors.Convert01(url.Parse(dbUrl)) + args := data.Options.Args + err := errors.Convert(os.Chdir(projectPath)) if err != nil { return err } - dbType := u.Scheme - dbUsername := u.User.Username() - dbPassword, _ := u.User.Password() - dbServer, dbPort, _ := net.SplitHostPort(u.Host) - dbDataBase := u.Path[1:] - var dbSchema string - flag := strings.Compare(dbType, "mysql") - if flag == 0 { - // mysql database - dbSchema = dbDataBase - } else { - // other database - mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery)) + _, err = errors.Convert01(os.Stat("profiles.yml")) + // if profiles.yml not exist, create it manually + if err != nil { + dbUrl := taskCtx.GetConfig("DB_URL") + u, err := errors.Convert01(url.Parse(dbUrl)) if err != nil { return err } - if value, ok := mapQuery["search_path"]; ok { - if len(value) < 1 { - return errors.Default.New("DB_URL search_path parses error") + dbType := u.Scheme + dbUsername := u.User.Username() + dbPassword, _ := u.User.Password() + dbServer, dbPort, _ := net.SplitHostPort(u.Host) + dbDataBase := u.Path[1:] + var dbSchema string + flag := strings.Compare(dbType, "mysql") + if flag == 0 { + // mysql database + dbSchema = dbDataBase + } else { + // other database + mapQuery, err := errors.Convert01(url.ParseQuery(u.RawQuery)) + if err != nil { + return err } - dbSchema = value[0] + if value, ok := mapQuery["search_path"]; ok { + if len(value) < 1 { + return errors.Default.New("DB_URL search_path parses error") + } + dbSchema = value[0] + } else { + dbSchema = "public" + } + } + config := viper.New() + config.Set(projectName+".target", projectTarget) + config.Set(projectName+".outputs."+projectTarget+".type", dbType) + dbPortInt, _ := strconv.Atoi(dbPort) + config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt) + config.Set(projectName+".outputs."+projectTarget+".password", dbPassword) + config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema) + if flag == 0 { + config.Set(projectName+".outputs."+projectTarget+".server", dbServer) + config.Set(projectName+".outputs."+projectTarget+".username", dbUsername) + config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase) } else { - dbSchema = "public" + config.Set(projectName+".outputs."+projectTarget+".host", dbServer) + config.Set(projectName+".outputs."+projectTarget+".user", dbUsername) + config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase) + } + err = errors.Convert(config.WriteConfigAs("profiles.yml")) + if err != nil { + return err } } - err = errors.Convert(os.Chdir(projectPath)) - if err != nil { - return err - } - config := viper.New() - config.Set(projectName+".target", projectTarget) - config.Set(projectName+".outputs."+projectTarget+".type", dbType) - dbPortInt, _ := strconv.Atoi(dbPort) - config.Set(projectName+".outputs."+projectTarget+".port", dbPortInt) - config.Set(projectName+".outputs."+projectTarget+".password", dbPassword) - config.Set(projectName+".outputs."+projectTarget+".schema", dbSchema) - if flag == 0 { - config.Set(projectName+".outputs."+projectTarget+".server", dbServer) - config.Set(projectName+".outputs."+projectTarget+".username", dbUsername) - config.Set(projectName+".outputs."+projectTarget+".database", dbDataBase) - } else { - config.Set(projectName+".outputs."+projectTarget+".host", dbServer) - config.Set(projectName+".outputs."+projectTarget+".user", dbUsername) - config.Set(projectName+".outputs."+projectTarget+".dbname", dbDataBase) - } - err = errors.Convert(config.WriteConfigAs("profiles.yml")) - if err != nil { - return err - } + // if package.yml exist, install dbt dependencies _, err = errors.Convert01(os.Stat("packages.yml")) if err == nil { cmd := exec.Command("dbt", "deps") @@ -114,6 +121,9 @@ func DbtConverter(taskCtx core.SubTaskContext) errors.Error { } dbtExecParams = append(dbtExecParams, "--select") dbtExecParams = append(dbtExecParams, models...) + if args != nil { + dbtExecParams = append(dbtExecParams, args...) + } cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...) log.Info("dbt run script: %v", cmd) stdout, _ := cmd.StdoutPipe() diff --git a/plugins/dbt/tasks/git.go b/plugins/dbt/tasks/git.go new file mode 100644 index 00000000..d0e63d86 --- /dev/null +++ b/plugins/dbt/tasks/git.go @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package tasks + +import ( + "os/exec" + + "github.com/apache/incubator-devlake/errors" + "github.com/apache/incubator-devlake/plugins/core" +) + +func Git(taskCtx core.SubTaskContext) errors.Error { + logger := taskCtx.GetLogger() + data := taskCtx.GetData().(*DbtTaskData) + if data.Options.ProjectGitURL == "" { + return nil + } + cmd := exec.Command("git", "clone", data.Options.ProjectGitURL, data.Options.ProjectPath) + logger.Info("start clone dbt project: %v", cmd) + out, err := cmd.CombinedOutput() + if err != nil { + logger.Error(err, "clone dbt project failed") + return errors.Convert(err) + } + logger.Info("clone dbt project success: %v", string(out)) + return nil +} + +var GitMeta = core.SubTaskMeta{ + Name: "Git", + EntryPoint: Git, + EnabledByDefault: true, + Description: "Clone dbt project from git", +} diff --git a/plugins/dbt/tasks/task_data.go b/plugins/dbt/tasks/task_data.go index 7733d836..7bb20a70 100644 --- a/plugins/dbt/tasks/task_data.go +++ b/plugins/dbt/tasks/task_data.go @@ -18,12 +18,17 @@ limitations under the License. package tasks type DbtOptions struct { - ProjectPath string `json:"projectPath"` - ProjectName string `json:"projectName"` - ProjectTarget string `json:"projectTarget"` + ProjectPath string `json:"projectPath"` + ProjectName string `json:"projectName"` + ProjectTarget string `json:"projectTarget"` + // clone from git to projectPath if projectGitURL is not empty + ProjectGitURL string `json:"projectGitURL"` + // deprecated, use args instead ProjectVars map[string]interface{} `json:"projectVars"` SelectedModels []string `json:"selectedModels"` - Tasks []string `json:"tasks,omitempty"` + // dbt run args + Args []string `json:"args"` + Tasks []string `json:"tasks,omitempty"` } type DbtTaskData struct {
