lostluck commented on code in PR #16658:
URL: https://github.com/apache/beam/pull/16658#discussion_r991574892
##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
)
func main() {
+ if err := mainError(); err != nil {
+ log.Print(err)
+ os.Exit(1)
Review Comment:
This is already what happens when log.Fatal is being called, so it's unclear
why this whole change was made?
https://pkg.go.dev/log#Fatal
##########
sdks/python/container/boot.go:
##########
@@ -137,46 +145,49 @@ func main() {
options, err := provision.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
- log.Fatalf("Failed to convert pipeline options: %v", err)
+ return fmt.Errorf("Failed to convert pipeline options: %v", err)
}
// (2) Retrieve and install the staged packages.
//
- // Guard from concurrent artifact retrieval and installation,
- // when called by child processes in a worker pool.
+ // No log.Fatalf() from here on, otherwise deferred cleanups will not
be called!
Review Comment:
Then why not simply put the parts that require defer cleanups in a function,
instead of heavy handedly making all of main a new function with log fatal?
In this case, I'd put the latter half into a function
"launchWorkerProcesses" which handles the defer required portions, with the
artifact/provision contract portions handled in a "setupBeamRequirements" or
similarly named function. This would make the "phases" obvious in main.
##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
)
func main() {
+ if err := mainError(); err != nil {
+ log.Print(err)
+ os.Exit(1)
+ }
+}
+
+func mainError() error {
Review Comment:
mainError isn't a good name. Convention would call it "tryMain", but that's
also a bad name.
If anything, it should be "launchSDKProcess" or similar which documents it's
purpose.
##########
sdks/python/container/boot.go:
##########
@@ -137,46 +145,49 @@ func main() {
options, err := provision.ProtoToJSON(info.GetPipelineOptions())
if err != nil {
- log.Fatalf("Failed to convert pipeline options: %v", err)
+ return fmt.Errorf("Failed to convert pipeline options: %v", err)
}
// (2) Retrieve and install the staged packages.
//
- // Guard from concurrent artifact retrieval and installation,
- // when called by child processes in a worker pool.
+ // No log.Fatalf() from here on, otherwise deferred cleanups will not
be called!
Review Comment:
As an aside, the boot.go mains are long overdue for refactoring to be more
testable, and share code with each other. They all duplicate very common
portions leading to repeated work in implementing features/capabilities in any
of the pre-amble.
That's out of scope for this PR though.
##########
sdks/python/container/boot.go:
##########
@@ -73,13 +74,20 @@ const (
)
func main() {
+ if err := mainError(); err != nil {
+ log.Print(err)
+ os.Exit(1)
+ }
+}
+
+func mainError() error {
flag.Parse()
Review Comment:
Conventionally, flag.Parse() should remain in main(), rather some called out
function.
##########
sdks/python/container/boot.go:
##########
@@ -200,24 +211,112 @@ func main() {
}
}
+ workerIds := append([]string{*id}, info.GetSiblingWorkerIds()...)
+
+ // Keep track of child PIDs for clean shutdown without zombies
+ childPids := struct {
+ v []int
+ canceled bool
+ mu sync.Mutex
+ } {v: make([]int, 0, len(workerIds))}
+
+ // Forward trapped signals to child process groups in order to
terminate them gracefully and avoid zombies
+ go func() {
+ log.Printf("Received signal: %v", <-signalChannel)
+ childPids.mu.Lock()
+ childPids.canceled = true
+ for _, pid := range childPids.v {
+ go func(pid int) {
+ // This goroutine will be canceled if the main
process exits before the 5 seconds
+ // have elapsed, i.e., as soon as all
subprocesses have returned from Wait().
+ time.Sleep(5 * time.Second)
+ if err := syscall.Kill(-pid, syscall.SIGKILL);
err == nil {
+ log.Printf("Worker process %v did not
respond, killed it.", pid)
+ }
+ }(pid)
+ syscall.Kill(-pid, syscall.SIGTERM)
Review Comment:
Q: Why is the negative pid being used here? I'd have expected simply `pid`.
I haven't used syscall.Kill before, and it doesn't have any doc comment,
https://pkg.go.dev/syscall#Kill, so it's very odd to me. If there's some magic
here, please add a comment explaining the oddity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]