This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06c76b4e930 Updates the test suite to use the transform service 
(#30605)
06c76b4e930 is described below

commit 06c76b4e930d76c6e38a9a78fce3ccd223de4cbd
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Tue Mar 12 17:00:30 2024 -0700

    Updates the test suite to use the transform service (#30605)
    
    * Updates the test suite to use the transform service
    
    * Trigger the test suite
    
    * Fix formatting
    
    * Addressing reviewer comments
---
 .../beam_PostCommit_TransformService_Direct.json   |  3 +++
 .../apache_beam/io/gcp/bigtableio_it_test.py       | 16 +++++++++++++--
 sdks/python/expansion-service-container/boot.go    | 24 +++++++++++++++++-----
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_TransformService_Direct.json 
b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json
@@ -0,0 +1,3 @@
+{
+  "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+}
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 8c0ca29c410..2445fdd5fcc 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -93,6 +93,11 @@ class TestReadFromBigTableIT(unittest.TestCase):
     self.table = self.instance.table(self.TABLE_ID)
     self.table.create()
     _LOGGER.info("Created table [%s]", self.table.table_id)
+    if (os.environ.get('TRANSFORM_SERVICE_PORT')):
+      self._transform_service_address = (
+          'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
+    else:
+      self._transform_service_address = None
 
   def tearDown(self):
     try:
@@ -142,7 +147,8 @@ class TestReadFromBigTableIT(unittest.TestCase):
           | bigtableio.ReadFromBigtable(
               project_id=self.project,
               instance_id=self.instance.instance_id,
-              table_id=self.table.table_id)
+              table_id=self.table.table_id,
+              expansion_service=self._transform_service_address)
           | "Extract cells" >> beam.Map(lambda row: row._cells))
 
       assert_that(cells, equal_to(expected_cells))
@@ -190,6 +196,11 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
         (self.TABLE_ID, str(int(time.time())), secrets.token_hex(3)))
     self.table.create()
     _LOGGER.info("Created table [%s]", self.table.table_id)
+    if (os.environ.get('TRANSFORM_SERVICE_PORT')):
+      self._transform_service_address = (
+          'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
+    else:
+      self._transform_service_address = None
 
   def tearDown(self):
     try:
@@ -216,7 +227,8 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
               project_id=self.project,
               instance_id=self.instance.instance_id,
               table_id=self.table.table_id,
-              use_cross_language=True))
+              use_cross_language=True,
+              expansion_service=self._transform_service_address))
 
   def test_set_mutation(self):
     row1: DirectRow = DirectRow('key-1')
diff --git a/sdks/python/expansion-service-container/boot.go 
b/sdks/python/expansion-service-container/boot.go
index ba56b349c4e..a04b176f5d8 100644
--- a/sdks/python/expansion-service-container/boot.go
+++ b/sdks/python/expansion-service-container/boot.go
@@ -97,8 +97,8 @@ func installExtraPackages(requirementsFile string) error {
        return nil
 }
 
-func getUpdatedRequirementsFile(oldRequirementsFileName string, 
dependenciesDir string) (string, error) {
-       oldExtraPackages, err := getLines(filepath.Join(dependenciesDir, 
oldRequirementsFileName))
+func getUpdatedRequirementsFile(oldDependenciesRequirementsFile string, 
dependenciesDir string) (string, error) {
+       oldExtraPackages, err := getLines(oldDependenciesRequirementsFile)
        if err != nil {
                return "", err
        }
@@ -145,9 +145,20 @@ func launchExpansionServiceProcess() error {
 
        args := []string{"-m", expansionServiceEntrypoint, "-p", 
strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
 
-       if *requirements_file != "" {
-               log.Printf("Received the requirements file %v", 
*requirements_file)
-               updatedRequirementsFileName, err := 
getUpdatedRequirementsFile(*requirements_file, *dependencies_dir)
+       // Requirements file with dependencies to install.
+       // Note that we have to look for the requirements file in the 
dependencies
+       // volume here not the requirements file at the top level. Latter 
provides
+       // Beam dependencies.
+       dependencies_requirements_file := filepath.Join(*dependencies_dir, 
*requirements_file)
+       dependencies_requirements_file_exists := false
+       if _, err := os.Stat(dependencies_requirements_file); err == nil {
+               dependencies_requirements_file_exists = true
+       }
+
+       // We only try to install dependencies, if the requirements file exists.
+       if dependencies_requirements_file_exists {
+               log.Printf("Received the requirements file %s with extra 
packages.", dependencies_requirements_file)
+               updatedRequirementsFileName, err := 
getUpdatedRequirementsFile(dependencies_requirements_file, *dependencies_dir)
                if err != nil {
                        return err
                }
@@ -161,7 +172,10 @@ func launchExpansionServiceProcess() error {
                if err != nil {
                        return err
                }
+       } else {
+               log.Printf("Requirements file %s was provided but not 
available.", dependencies_requirements_file)
        }
+
        if err := execx.Execute(pythonVersion, args...); err != nil {
                return fmt.Errorf("could not start the expansion service: %s", 
err)
        }

Reply via email to