This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new aeebe56720b Clean up some unneccessarily nested ifs in stage.go
(#26119)
aeebe56720b is described below
commit aeebe56720be4bf0442e923618fc712431fb82b0
Author: Danny McCormick <[email protected]>
AuthorDate: Wed Apr 5 10:54:41 2023 -0400
Clean up some unneccessarily nested ifs in stage.go (#26119)
---
.../playground_components/assets/symbols/go.g.yaml | 4 ++--
sdks/go/pkg/beam/runners/universal/runnerlib/stage.go | 19 ++++++++++---------
.../beam/runners/universal/runnerlib/stage_test.go | 2 +-
3 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/playground/frontend/playground_components/assets/symbols/go.g.yaml
b/playground/frontend/playground_components/assets/symbols/go.g.yaml
index cf3b2a0c9c9..2c156f4d2d7 100644
--- a/playground/frontend/playground_components/assets/symbols/go.g.yaml
+++ b/playground/frontend/playground_components/assets/symbols/go.g.yaml
@@ -762,8 +762,8 @@
- StageDir
- StageFile
- StageModel
- - StageViaLegacyApi
- - StageViaPortableApi
+ - StageViaLegacyAPI
+ - StageViaPortableAPI
- StartLoopback
- StartReadTimestamp
- Step
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
index bb1f51c13c7..3cbc3343683 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage.go
@@ -43,16 +43,17 @@ func Stage(ctx context.Context, id, endpoint, binary, st
string) (retrievalToken
}
defer cc.Close()
- if err := StageViaPortableApi(ctx, cc, binary, st); err == nil {
+ if err := StageViaPortableAPI(ctx, cc, binary, st); err == nil {
return "", nil
- } else {
- log.Warnf(ctx, "unable to stage with PortableAPI: %v; falling
back to legacy", err)
}
+ log.Warnf(ctx, "unable to stage with PortableAPI: %v; falling back to
legacy", err)
- return StageViaLegacyApi(ctx, cc, binary, st)
+ return StageViaLegacyAPI(ctx, cc, binary, st)
}
-func StageViaPortableApi(ctx context.Context, cc *grpc.ClientConn, binary, st
string) (retErr error) {
+// StageViaPortableAPI stages the worker binary and any additional files
+// using the given grpc connection for the Portable API.
+func StageViaPortableAPI(ctx context.Context, cc *grpc.ClientConn, binary, st
string) (retErr error) {
const attempts = 3
var failures []string
for {
@@ -60,9 +61,7 @@ func StageViaPortableApi(ctx context.Context, cc
*grpc.ClientConn, binary, st st
if err == nil {
return nil // success!
}
- if err != nil {
- failures = append(failures, err.Error())
- }
+ failures = append(failures, err.Error())
if len(failures) > attempts {
return errors.Errorf("failed to stage artifacts for
token %v in %v attempts: %v", st, attempts, strings.Join(failures, ";\n"))
}
@@ -180,7 +179,9 @@ func stageFile(filename string, stream
jobpb.ArtifactStagingService_ReverseArtif
}
}
-func StageViaLegacyApi(ctx context.Context, cc *grpc.ClientConn, binary, st
string) (retrievalToken string, err error) {
+// StageViaLegacyAPI stages the worker binary and any additional files using
the
+// given grpc connection for the Legacy API. It returns the retrieval token if
successful.
+func StageViaLegacyAPI(ctx context.Context, cc *grpc.ClientConn, binary, st
string) (retrievalToken string, err error) {
client := jobpb.NewLegacyArtifactStagingServiceClient(cc)
files := []artifact.KeyedFile{{Key: "worker", Filename: binary}}
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/stage_test.go
b/sdks/go/pkg/beam/runners/universal/runnerlib/stage_test.go
index 2a1d784d4e7..81e4402fcbd 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/stage_test.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/stage_test.go
@@ -73,7 +73,7 @@ func TestPortableArtifactStaging(t *testing.T) {
das.wantToken = "token"
das.reqFile = "stage.go"
- err := StageViaPortableApi(ctx, cc, "reqFile", das.wantToken)
+ err := StageViaPortableAPI(ctx, cc, "reqFile", das.wantToken)
if err != nil {
t.Fatal(err)
}