[ 
https://issues.apache.org/jira/browse/BEAM-4666?focusedWorklogId=117086&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117086
 ]

ASF GitHub Bot logged work on BEAM-4666:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Jun/18 22:27
            Start Date: 28/Jun/18 22:27
    Worklog Time Spent: 10m 
      Work Description: herohde closed pull request #5810: [BEAM-4666] Make Go 
run on Flink again
URL: https://github.com/apache/beam/pull/5810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
index 9a38b8d89cd..814ef7d5b21 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/artifact/BeamFileSystemArtifactRetrievalService.java
@@ -21,6 +21,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -70,7 +71,12 @@ public static BeamFileSystemArtifactRetrievalService 
create() {
   public void getManifest(
       ArtifactApi.GetManifestRequest request,
       StreamObserver<ArtifactApi.GetManifestResponse> responseObserver) {
-    String token = request.getRetrievalToken();
+    final String token = request.getRetrievalToken();
+    if (Strings.isNullOrEmpty(token)) {
+      throw new StatusRuntimeException(
+              Status.INVALID_ARGUMENT.withDescription("Empty artifact token"));
+    }
+
     LOG.info("GetManifest for {}", token);
     try {
       ArtifactApi.ProxyManifest proxyManifest = MANIFEST_CACHE.get(token);
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index 5ea0d6521ab..5a441696e2b 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -19,6 +19,9 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.base.Strings;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
 import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -74,7 +77,12 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
   @Override
   public StreamObserver<BeamFnApi.InstructionResponse> control(
       StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    String workerId = headerAccessor.getSdkWorkerId();
+    final String workerId = headerAccessor.getSdkWorkerId();
+    if (Strings.isNullOrEmpty(workerId)) {
+      // TODO(BEAM-4149): Enforce proper worker id.
+      LOGGER.warn("No worker_id header provided in control request");
+    }
+
     LOGGER.info("Beam Fn Control client connected with id {}", workerId);
     FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(workerId, requestObserver);
     try {
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
index 04eb1cd02c3..78158b504c9 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
@@ -18,8 +18,6 @@
 package org.apache.beam.runners.fnexecution.environment;
 
 import com.google.common.collect.ImmutableList;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
@@ -114,8 +112,6 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
     String workerId = idGenerator.getId();
 
     // Prepare docker invocation.
-    Path workerPersistentDirectory = 
Files.createTempDirectory("worker_persistent_directory");
-    Path semiPersistentDirectory = 
Files.createTempDirectory("semi_persistent_dir");
     String containerImage = environment.getUrl();
     // TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default 
service address will not
     // work for Docker for Mac.
@@ -123,11 +119,9 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
     String artifactEndpoint = 
retrievalServiceServer.getApiServiceDescriptor().getUrl();
     String provisionEndpoint = 
provisioningServiceServer.getApiServiceDescriptor().getUrl();
     String controlEndpoint = 
controlServiceServer.getApiServiceDescriptor().getUrl();
+
     List<String> volArg =
         ImmutableList.of(
-            "-v",
-            // TODO: Mac only allows temporary mounts under /tmp by default 
(as of 17.12).
-            String.format("%s:%s", workerPersistentDirectory, 
semiPersistentDirectory),
             // NOTE: Host networking does not work on Mac, but the command 
line flag is accepted.
             "--network=host");
 
@@ -137,8 +131,7 @@ public RemoteEnvironment createEnvironment(Environment 
environment) throws Excep
             String.format("--logging_endpoint=%s", loggingEndpoint),
             String.format("--artifact_endpoint=%s", artifactEndpoint),
             String.format("--provision_endpoint=%s", provisionEndpoint),
-            String.format("--control_endpoint=%s", controlEndpoint),
-            String.format("--semi_persist_dir=%s", semiPersistentDirectory));
+            String.format("--control_endpoint=%s", controlEndpoint));
 
     LOG.debug("Creating Docker Container with ID {}", workerId);
     // Wrap the blocking call to clientSource.get in case an exception is 
thrown.
diff --git a/sdks/go/pkg/beam/artifact/stage.go 
b/sdks/go/pkg/beam/artifact/stage.go
index c21ba5ce9bf..8ea5c0e692a 100644
--- a/sdks/go/pkg/beam/artifact/stage.go
+++ b/sdks/go/pkg/beam/artifact/stage.go
@@ -163,7 +163,7 @@ func Stage(ctx context.Context, client 
pb.ArtifactStagingServiceClient, key, fil
                stream.CloseAndRecv() // ignore error
                return nil, fmt.Errorf("failed to send chunks for %v: %v", 
filename, err)
        }
-       if _, err := stream.CloseAndRecv(); err != nil {
+       if _, err := stream.CloseAndRecv(); err != nil && err != io.EOF {
                return nil, fmt.Errorf("failed to close stream for %v: %v", 
filename, err)
        }
        if hash != stagedHash {
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index 14b6b7d4e0f..a59e53187df 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -69,10 +69,10 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) 
(*pb.Pipeline, error) {
 
        var roots []string
        for _, edge := range tree.Edges {
-               roots = append(roots, m.addMultiEdge(edge))
+               roots = append(roots, m.addMultiEdge("", edge))
        }
        for _, t := range tree.Children {
-               roots = append(roots, m.addScopeTree(t))
+               roots = append(roots, m.addScopeTree("", t))
        }
 
        p := &pb.Pipeline{
@@ -117,18 +117,20 @@ func (m *marshaller) build() *pb.Components {
        }
 }
 
-func (m *marshaller) addScopeTree(s *ScopeTree) string {
+func (m *marshaller) addScopeTree(trunk string, s *ScopeTree) string {
        id := scopeID(s.Scope.Scope)
        if _, exists := m.transforms[id]; exists {
                return id
        }
 
+       uniqueName := fmt.Sprintf("%v/%v", trunk, s.Scope.Name)
+
        var subtransforms []string
        for _, edge := range s.Edges {
-               subtransforms = append(subtransforms, m.addMultiEdge(edge))
+               subtransforms = append(subtransforms, 
m.addMultiEdge(uniqueName, edge))
        }
        for _, tree := range s.Children {
-               subtransforms = append(subtransforms, m.addScopeTree(tree))
+               subtransforms = append(subtransforms, 
m.addScopeTree(uniqueName, tree))
        }
 
        // Compute the input/output for this scope:
@@ -143,7 +145,7 @@ func (m *marshaller) addScopeTree(s *ScopeTree) string {
        }
 
        transform := &pb.PTransform{
-               UniqueName:    s.Scope.Name,
+               UniqueName:    uniqueName,
                Subtransforms: subtransforms,
                Inputs:        diff(in, out),
                Outputs:       diff(out, in),
@@ -222,7 +224,7 @@ func inout(transform *pb.PTransform, in, out 
map[string]bool) {
        }
 }
 
-func (m *marshaller) addMultiEdge(edge NamedEdge) string {
+func (m *marshaller) addMultiEdge(trunk string, edge NamedEdge) string {
        id := StableMultiEdgeID(edge.Edge)
        if _, exists := m.transforms[id]; exists {
                return id
@@ -244,7 +246,7 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string {
        }
 
        transform := &pb.PTransform{
-               UniqueName: edge.Name,
+               UniqueName: fmt.Sprintf("%v/%v", trunk, edge.Name),
                Spec:       m.makePayload(edge.Edge),
                Inputs:     inputs,
                Outputs:    outputs,
diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go 
b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
index 9b37f0dac98..ca637d02711 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go
@@ -31,6 +31,7 @@ import (
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness"
+       "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
 )
 
 var (
@@ -81,7 +82,8 @@ func hook() {
        // Since Init() is hijacking main, it's appropriate to do as main
        // does, and establish the background context here.
 
-       if err := harness.Main(context.Background(), *loggingEndpoint, 
*controlEndpoint); err != nil {
+       ctx := grpcx.WriteWorkerID(context.Background(), *id)
+       if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != 
nil {
                fmt.Fprintf(os.Stderr, "Worker failed: %v", err)
                os.Exit(1)
        }
diff --git a/sdks/go/pkg/beam/options/jobopts/options.go 
b/sdks/go/pkg/beam/options/jobopts/options.go
index 99df6b1b076..1e60f60fbf2 100644
--- a/sdks/go/pkg/beam/options/jobopts/options.go
+++ b/sdks/go/pkg/beam/options/jobopts/options.go
@@ -49,10 +49,6 @@ var (
 
        // Async determines whether to wait for job completion.
        Async = flag.Bool("async", false, "Do not wait for job completion.")
-
-       // InternalJavaRunner is the java class needed at this time for Java 
runners.
-       // To be removed.
-       InternalJavaRunner = flag.String("internal_java_runner", "", "Internal 
java runner class.")
 )
 
 // GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
diff --git a/sdks/go/pkg/beam/runners/flink/flink.go 
b/sdks/go/pkg/beam/runners/flink/flink.go
index 74bd307c9d8..0384b9888ef 100644
--- a/sdks/go/pkg/beam/runners/flink/flink.go
+++ b/sdks/go/pkg/beam/runners/flink/flink.go
@@ -20,7 +20,6 @@ import (
        "context"
 
        "github.com/apache/beam/sdks/go/pkg/beam"
-       "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/universal"
 )
 
@@ -31,8 +30,5 @@ func init() {
 // Execute runs the given pipeline on Flink. Convenience wrapper over the
 // universal runner.
 func Execute(ctx context.Context, p *beam.Pipeline) error {
-       if *jobopts.InternalJavaRunner == "" {
-               *jobopts.InternalJavaRunner = 
"org.apache.beam.runners.flink.FlinkRunner"
-       }
        return universal.Execute(ctx, p)
 }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
index 558c68e3be6..7f0541da5a5 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/execute.go
@@ -61,7 +61,7 @@ func Execute(ctx context.Context, p *pb.Pipeline, endpoint 
string, opt *JobOptio
                log.Infof(ctx, "Using specified worker binary: '%v'", bin)
        }
 
-       token, err := Stage(ctx, prepID, artifactEndpoint, st, bin)
+       token, err := Stage(ctx, prepID, artifactEndpoint, bin, st)
        if err != nil {
                return "", err
        }
diff --git a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go 
b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
index 6b37da09d73..d1b5b3c346b 100644
--- a/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
+++ b/sdks/go/pkg/beam/runners/universal/runnerlib/job.go
@@ -41,9 +41,6 @@ type JobOptions struct {
 
        // Worker is the worker binary override.
        Worker string
-
-       // InternalJavaRunner is the class of the receiving Java runner. To be 
removed.
-       InternalJavaRunner string
 }
 
 // Prepare prepares a job to the given job service. It returns the preparation 
id
@@ -51,7 +48,6 @@ type JobOptions struct {
 func Prepare(ctx context.Context, client jobpb.JobServiceClient, p 
*pb.Pipeline, opt *JobOptions) (id, endpoint, stagingToken string, err error) {
        raw := runtime.RawOptionsWrapper{
                Options:     beam.PipelineOptions.Export(),
-               Runner:      opt.InternalJavaRunner,
                AppName:     opt.Name,
                Experiments: append(opt.Experiments, "beam_fn_api"),
        }
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index 02aaa6f219c..8b592d3bdf2 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -54,7 +54,6 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                Name:               jobopts.GetJobName(),
                Experiments:        jobopts.GetExperiments(),
                Worker:             *jobopts.WorkerBinary,
-               InternalJavaRunner: *jobopts.InternalJavaRunner,
        }
        _, err = runnerlib.Execute(ctx, pipeline, endpoint, opt, *jobopts.Async)
        return err


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 117086)
    Time Spent: 3h  (was: 2h 50m)

> Go SDK fails to stage artifacts on Flink
> ----------------------------------------
>
>                 Key: BEAM-4666
>                 URL: https://issues.apache.org/jira/browse/BEAM-4666
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>          Time Spent: 3h
>  Remaining Estimate: 0h
>
> $ go run examples/wordcount/wordcount.go --runner=flink 
> --endpoint=localhost:8099 --output=/tmp/asd
> 2018/06/27 17:17:02 No container image specified. Using dev image: 
> 'herohde-docker-apache.bintray.io/beam/go:latest'
> 2018/06/27 17:17:02 Prepared job with id: 
> go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a and staging 
> token: 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}
> 2018/06/27 17:17:02 Cross-compiling 
> /Users/herohde/go/src/github.com/apache/beam/sdks/go/examples/wordcount/wordcount.go
>  as 
> /var/folders/s2/97strbs55_353t_t7r24yf94009w4s/T/beam-go-1530145022019779968
> 2018/06/27 17:17:12 Failed to execute job: failed to stage artifacts: failed 
> to stage 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}
>  in 3 attempts: stat 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}:
>  no such file or directory; stat 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}:
>  no such file or directory; stat 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}:
>  no such file or directory; stat 
> {"sessionId":"go-job-1-1530145022009357193_b49f59e5-4dfc-4b82-aa67-cad62a3f7d6a","basePath":"/tmp/flink-artifacts"}:
>  no such file or directory
> exit status 1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to