This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 45df0ac6 [BEAM-13399] Add integration test for Go SDK expansion 
service JAR pull/start-up (#16313)
45df0ac6 is described below

commit 45df0ac6a3cf28e5b8ebf8a6899e3640c520f09d
Author: Jack McCluskey <34928439+jrmcclus...@users.noreply.github.com>
AuthorDate: Tue Dec 21 18:04:32 2021 -0500

    [BEAM-13399] Add integration test for Go SDK expansion service JAR 
pull/start-up (#16313)
---
 .../beam/core/runtime/xlangx/expansionx/process.go |  4 ++
 sdks/go/test/integration/xlang/expansion_test.go   | 68 ++++++++++++++++++++++
 2 files changed, 72 insertions(+)

diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go 
b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
index 935aeff..fa94835 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/expansionx/process.go
@@ -18,6 +18,7 @@ package expansionx
 import (
        "fmt"
        "os/exec"
+       "time"
 )
 
 // ExpansionServiceRunner is a type that holds information required to
@@ -51,6 +52,9 @@ func (e *ExpansionServiceRunner) StartService() error {
        if err != nil {
                return err
        }
+       // Start() is non-blocking so a brief sleep to let the JAR start up and 
begin accepting
+       // connections is necessary.
+       time.Sleep(2 * time.Second)
        if e.serviceCommand.ProcessState != nil {
                return fmt.Errorf("process %v exited when it should still be 
running", e.serviceCommand.Process)
        }
diff --git a/sdks/go/test/integration/xlang/expansion_test.go 
b/sdks/go/test/integration/xlang/expansion_test.go
new file mode 100644
index 0000000..070fccf
--- /dev/null
+++ b/sdks/go/test/integration/xlang/expansion_test.go
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package xlang
+
+import (
+       "os"
+       "os/exec"
+       "strings"
+       "testing"
+
+       
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx/expansionx"
+       "github.com/apache/beam/sdks/v2/go/test/integration"
+)
+
+const (
+       // TODO(BEAN-13505): Select the most recent Beam release instead of a 
hard-coded
+       // string.
+       beamVersion   = "2.34.0"
+       gradleTarget  = ":sdks:java:io:expansion-service:runExpansionService"
+       expansionPort = "8097"
+)
+
+func checkPort(t *testing.T, port string) {
+       ping := exec.Command("nc", "-vz", "localhost", port)
+       output, err := ping.CombinedOutput()
+       if err != nil {
+               t.Errorf("failed to run ping to localhost:%v", port)
+       }
+       outputStr := string(output)
+       if strings.Contains(outputStr, "failed") {
+               t.Errorf("failed to connect to localhost:%v, got err %v", port, 
outputStr)
+       }
+}
+
+func TestAutomatedExpansionService(t *testing.T) {
+       integration.CheckFilters(t)
+       jarPath, err := expansionx.GetBeamJar(gradleTarget, beamVersion)
+       if err != nil {
+               t.Fatalf("failed to get JAR path, got %v", err)
+       }
+       t.Cleanup(func() { os.Remove(jarPath) })
+
+       serviceRunner := expansionx.NewExpansionServiceRunner(jarPath, 
expansionPort)
+       err = serviceRunner.StartService()
+       if err != nil {
+               t.Errorf("failed to start expansion service JAR, got %v", err)
+       }
+
+       checkPort(t, expansionPort)
+
+       err = serviceRunner.StopService()
+       if err != nil {
+               t.Errorf("failed to stop expansion service JAR, got %v", err)
+       }
+}

Reply via email to