[ https://issues.apache.org/jira/browse/BEAM-4666?focusedWorklogId=117086&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117086 ]
ASF GitHub Bot logged work on BEAM-4666: ---------------------------------------- Author: ASF GitHub Bot Created on: 28/Jun/18 22:27 Start Date: 28/Jun/18 22:27 Worklog Time Spent: 10m Work Description: herohde closed pull request #5810: [BEAM-4666] Make Go run on Flink again URL: https://github.com/apache/beam/pull/5810 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java index 9a38b8d89cd..814ef7d5b21 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; @@ -70,7 +71,12 @@ public static BeamFileSystemArtifactRetrievalService create() { public void getManifest( ArtifactApi.GetManifestRequest request, StreamObserver<ArtifactApi.GetManifestResponse> responseObserver) { - String token = request.getRetrievalToken(); + final String token = request.getRetrievalToken(); + if (Strings.isNullOrEmpty(token)) { + throw new StatusRuntimeException( + Status.INVALID_ARGUMENT.withDescription("Empty artifact token")); + } + LOG.info("GetManifest for {}", token); try { ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(token); diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java index 5ea0d6521ab..5a441696e2b 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java @@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkState; +import com.google.common.base.Strings; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Collection; @@ -74,7 +77,12 @@ public static FnApiControlClientPoolService offeringClientsToPool( @Override public StreamObserver<BeamFnApi.InstructionResponse> control( StreamObserver<BeamFnApi.InstructionRequest> requestObserver) { - String workerId = headerAccessor.getSdkWorkerId(); + final String workerId = headerAccessor.getSdkWorkerId(); + if (Strings.isNullOrEmpty(workerId)) { + // TODO(BEAM-4149): Enforce proper worker id. + LOGGER.warn("No worker_id header provided in control request"); + } + LOGGER.info("Beam Fn Control client connected with id {}", workerId); FnApiControlClient newClient = FnApiControlClient.forRequestObserver(workerId, requestObserver); try { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java index 04eb1cd02c3..78158b504c9 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java @@ -18,8 +18,6 @@ package org.apache.beam.runners.fnexecution.environment; import com.google.common.collect.ImmutableList; -import java.nio.file.Files; -import java.nio.file.Path; import java.time.Duration; import java.util.List; import java.util.concurrent.TimeoutException; @@ -114,8 +112,6 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep String workerId = idGenerator.getId(); // Prepare docker invocation. - Path workerPersistentDirectory = Files.createTempDirectory("worker_persistent_directory"); - Path semiPersistentDirectory = Files.createTempDirectory("semi_persistent_dir"); String containerImage = environment.getUrl(); // TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not // work for Docker for Mac. @@ -123,11 +119,9 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl(); String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl(); String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl(); + List<String> volArg = ImmutableList.of( - "-v", - // TODO: Mac only allows temporary mounts under /tmp by default (as of 17.12). - String.format("%s:%s", workerPersistentDirectory, semiPersistentDirectory), // NOTE: Host networking does not work on Mac, but the command line flag is accepted. "--network=host"); @@ -137,8 +131,7 @@ public RemoteEnvironment createEnvironment(Environment environment) throws Excep String.format("--logging_endpoint=%s", loggingEndpoint), String.format("--artifact_endpoint=%s", artifactEndpoint), String.format("--provision_endpoint=%s", provisionEndpoint), - String.format("--control_endpoint=%s", controlEndpoint), - String.format("--semi_persist_dir=%s", semiPersistentDirectory)); + String.format("--control_endpoint=%s", controlEndpoint)); LOG.debug("Creating Docker Container with ID {}", workerId); // Wrap the blocking call to clientSource.get in case an exception is thrown. diff --git a/sdks/go/pkg/beam/artifact/stage.go b/sdks/go/pkg/beam/artifact/stage.go index c21ba5ce9bf..8ea5c0e692a 100644 --- a/sdks/go/pkg/beam/artifact/stage.go +++ b/sdks/go/pkg/beam/artifact/stage.go @@ -163,7 +163,7 @@ func Stage(ctx context.Context, client pb.ArtifactStagingServiceClient, key, fil stream.CloseAndRecv() // ignore error return nil, fmt.Errorf("failed to send chunks for %v: %v", filename, err) } - if _, err := stream.CloseAndRecv(); err != nil { + if _, err := stream.CloseAndRecv(); err != nil && err != io.EOF { return nil, fmt.Errorf("failed to close stream for %v: %v", filename, err) } if hash != stagedHash { diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 14b6b7d4e0f..a59e53187df 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -69,10 +69,10 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pb.Pipeline, error) { var roots []string for _, edge := range tree.Edges { - roots = append(roots, m.addMultiEdge(edge)) + roots = append(roots, m.addMultiEdge("", edge)) } for _, t := range tree.Children { - roots = append(roots, m.addScopeTree(t)) + roots = append(roots, m.addScopeTree("", t)) } p := &pb.Pipeline{ @@ -117,18 +117,20 @@ func (m *marshaller) build() *pb.Components { } } -func (m *marshaller) addScopeTree(s *ScopeTree) string { +func (m *marshaller) addScopeTree(trunk string, s *ScopeTree) string { id := scopeID(s.Scope.Scope) if _, exists := m.transforms[id]; exists { return id } + uniqueName := fmt.Sprintf("%v/%v", trunk, s.Scope.Name) + var subtransforms []string for _, edge := range s.Edges { - subtransforms = append(subtransforms, m.addMultiEdge(edge)) + subtransforms = append(subtransforms, m.addMultiEdge(uniqueName, edge)) } for _, tree := range s.Children { - subtransforms = append(subtransforms, m.addScopeTree(tree)) + subtransforms = append(subtransforms, m.addScopeTree(uniqueName, tree)) } // Compute the input/output for this scope: @@ -143,7 +145,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string { } transform := &pb.PTransform{ - UniqueName: s.Scope.Name, + UniqueName: uniqueName, Subtransforms: subtransforms, Inputs: diff(in, out), Outputs: diff(out, in), @@ -222,7 +224,7 @@ func inout(transform *pb.PTransform, in, out map[string]bool) { } } -func (m *marshaller) addMultiEdge(edge NamedEdge) string { +func (m *marshaller) addMultiEdge(trunk string, edge NamedEdge) string { id := StableMultiEdgeID(edge.Edge) if _, exists := m.transforms[id]; exists { return id @@ -244,7 +246,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string { } transform := &pb.PTransform{ - UniqueName: edge.Name, + UniqueName: fmt.Sprintf("%v/%v", trunk, edge.Name), Spec: m.makePayload(edge.Edge), Inputs: inputs, Outputs: outputs, diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go index 9b37f0dac98..ca637d02711 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go +++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go @@ -31,6 +31,7 @@ import ( "github.com/apache/beam/sdks/go/pkg/beam/core/runtime" "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness" + "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx" ) var ( @@ -81,7 +82,8 @@ func hook() { // Since Init() is hijacking main, it's appropriate to do as main // does, and establish the background context here. - if err := harness.Main(context.Background(), *loggingEndpoint, *controlEndpoint); err != nil { + ctx := grpcx.WriteWorkerID(context.Background(), *id) + if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil { fmt.Fprintf(os.Stderr, "Worker failed: %v", err) os.Exit(1) } diff --git a/sdks/go/pkg/beam/options/jobopts/options.go b/sdks/go/pkg/beam/options/jobopts/options.go index 99df6b1b076..1e60f60fbf2 100644 --- a/sdks/go/pkg/beam/options/jobopts/options.go +++ b/sdks/go/pkg/beam/options/jobopts/options.go @@ -49,10 +49,6 @@ var ( // Async determines whether to wait for job completion. Async = flag.Bool("async", false, "Do not wait for job completion.") - - // InternalJavaRunner is the java class needed at this time for Java runners. - // To be removed. - InternalJavaRunner = flag.String("internal_java_runner", "", "Internal java runner class.") ) // GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners diff --git a/sdks/go/pkg/beam/runners/flink/flink.go b/sdks/go/pkg/beam/runners/flink/flink.go index 74bd307c9d8..0384b9888ef 100644 --- a/sdks/go/pkg/beam/runners/flink/flink.go +++ b/sdks/go/pkg/beam/runners/flink/flink.go @@ -20,7 +20,6 @@ import ( "context" "github.com/apache/beam/sdks/go/pkg/beam" - "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts" "github.com/apache/beam/sdks/go/pkg/beam/runners/universal" ) @@ -31,8 +30,5 @@ func init() { // Execute runs the given pipeline on Flink. Convenience wrapper over the // universal runner. func Execute(ctx context.Context, p *beam.Pipeline) error { - if *jobopts.InternalJavaRunner == "" { - *jobopts.InternalJavaRunner = "org.apache.beam.runners.flink.FlinkRunner" - } return universal.Execute(ctx, p) } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go index 558c68e3be6..7f0541da5a5 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go @@ -61,7 +61,7 @@ func Execute(ctx context.Context, p *pb.Pipeline, endpoint string, opt *JobOptio log.Infof(ctx, "Using specified worker binary: '%v'", bin) } - token, err := Stage(ctx, prepID, artifactEndpoint, st, bin) + token, err := Stage(ctx, prepID, artifactEndpoint, bin, st) if err != nil { return "", err } diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go index 6b37da09d73..d1b5b3c346b 100644 --- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go +++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go @@ -41,9 +41,6 @@ type JobOptions struct { // Worker is the worker binary override. Worker string - - // InternalJavaRunner is the class of the receiving Java runner. To be removed. - InternalJavaRunner string } // Prepare prepares a job to the given job service. It returns the preparation id @@ -51,7 +48,6 @@ type JobOptions struct { func Prepare(ctx context.Context, client jobpb.JobServiceClient, p *pb.Pipeline, opt *JobOptions) (id, endpoint, stagingToken string, err error) { raw := runtime.RawOptionsWrapper{ Options: beam.PipelineOptions.Export(), - Runner: opt.InternalJavaRunner, AppName: opt.Name, Experiments: append(opt.Experiments, "beam_fn_api"), } diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go index 02aaa6f219c..8b592d3bdf2 100644 --- a/sdks/go/pkg/beam/runners/universal/universal.go +++ b/sdks/go/pkg/beam/runners/universal/universal.go @@ -54,7 +54,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error { Name: jobopts.GetJobName(), Experiments: jobopts.GetExperiments(), Worker: *jobopts.WorkerBinary, - InternalJavaRunner: *jobopts.InternalJavaRunner, } _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async) return err ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 117086) Time Spent: 3h (was: 2h 50m) > Go SDK fails to stage artifacts on Flink > ---------------------------------------- > > Key: BEAM-4666 > URL: https://issues.apache.org/jira/browse/BEAM-4666 > Project: Beam > Issue Type: Bug > Components: sdk-go > Reporter: Henning Rohde > Assignee: Henning Rohde > Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > $ go run examples/wordcount/wordcount.go --runner=flink > --endpoint=localhost:8099 --output=/tmp/asd > 2018/06/27 17:17:02 No container image specified. Using dev image: > 'herohde-docker-apache.bintray.io/beam/go:latest' > 2018/06/27 17:17:02 Prepared job with id: > go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a and staging > token: > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"} > 2018/06/27 17:17:02 Cross-compiling > /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go > as > /var/folders/s2/97strbs55_353t_t7r24yf94009w4s/T/beam-go-1530145022019779968 > 2018/06/27 17:17:12 Failed to execute job: failed to stage artifacts: failed > to stage > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"} > in 3 attempts: stat > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}: > no such file or directory; stat > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}: > no such file or directory; stat > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}: > no such file or directory; stat > {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}: > no such file or directory > exit status 1 -- This message was sent by Atlassian JIRA (v7.6.3#76005)