This is an automated email from the ASF dual-hosted git repository.
zhangliang2022 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-devlake.git
The following commit(s) were added to refs/heads/main by this push:
new c4268567a fix: maybe can fix dbt zombie processes (#4101)
c4268567a is described below
commit c4268567a8283e45489d411d8d3bba58c8289953
Author: abeizn <[email protected]>
AuthorDate: Wed Jan 4 18:26:43 2023 +0800
fix: maybe can fix dbt zombie processes (#4101)
* fix: maybe can fix dbt zombie processes
* fix: maybe can fix dbt zombie processes
---
plugins/dbt/tasks/convertor.go | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git a/plugins/dbt/tasks/convertor.go b/plugins/dbt/tasks/convertor.go
index 28e2aae81..7115e660f 100644
--- a/plugins/dbt/tasks/convertor.go
+++ b/plugins/dbt/tasks/convertor.go
@@ -181,10 +181,9 @@ func DbtConverter(taskCtx core.SubTaskContext)
errors.Error {
dbtExecParams = append(dbtExecParams, profile)
}
cmd := exec.Command(dbtExecParams[0], dbtExecParams[1:]...)
- log.Info("dbt run script: %v", cmd)
+ log.Info("dbt run script: ", cmd)
stdout, _ := cmd.StdoutPipe()
- err = errors.Convert(cmd.Start())
- if err != nil {
+ if err = errors.Convert(cmd.Start()); err != nil {
return err
}
// ProcessState contains information about an exited process, available
after a call to Wait.
@@ -195,7 +194,12 @@ func DbtConverter(taskCtx core.SubTaskContext)
errors.Error {
}()
// prevent zombie process
- defer cmd.Wait() //nolint
+ defer func() {
+ err := errors.Convert(cmd.Wait())
+ if err != nil {
+ log.Error(nil, "dbt run cmd.Wait() error")
+ }
+ }()
scanner := bufio.NewScanner(stdout)
var errStr string
@@ -213,6 +217,11 @@ func DbtConverter(taskCtx core.SubTaskContext)
errors.Error {
return err
}
+ // close stdout
+ if closeErr := stdout.Close(); closeErr != nil && err == nil {
+ return errors.Convert(closeErr)
+ }
+
return nil
}