This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06c76b4e930 Updates the test suite to use the transform service
(#30605)
06c76b4e930 is described below
commit 06c76b4e930d76c6e38a9a78fce3ccd223de4cbd
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Tue Mar 12 17:00:30 2024 -0700
Updates the test suite to use the transform service (#30605)
* Updates the test suite to use the transform service
* Trigger the test suite
* Fix formatting
* Addressing reviewer comments
---
.../beam_PostCommit_TransformService_Direct.json | 3 +++
.../apache_beam/io/gcp/bigtableio_it_test.py | 16 +++++++++++++--
sdks/python/expansion-service-container/boot.go | 24 +++++++++++++++++-----
3 files changed, 36 insertions(+), 7 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_TransformService_Direct.json
b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json
new file mode 100644
index 00000000000..c4edaa85a89
--- /dev/null
+++ b/.github/trigger_files/beam_PostCommit_TransformService_Direct.json
@@ -0,0 +1,3 @@
+{
+ "comment": "Modify this file in a trivial way to cause this test suite to
run"
+}
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
index 8c0ca29c410..2445fdd5fcc 100644
--- a/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
@@ -93,6 +93,11 @@ class TestReadFromBigTableIT(unittest.TestCase):
self.table = self.instance.table(self.TABLE_ID)
self.table.create()
_LOGGER.info("Created table [%s]", self.table.table_id)
+ if (os.environ.get('TRANSFORM_SERVICE_PORT')):
+ self._transform_service_address = (
+ 'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
+ else:
+ self._transform_service_address = None
def tearDown(self):
try:
@@ -142,7 +147,8 @@ class TestReadFromBigTableIT(unittest.TestCase):
| bigtableio.ReadFromBigtable(
project_id=self.project,
instance_id=self.instance.instance_id,
- table_id=self.table.table_id)
+ table_id=self.table.table_id,
+ expansion_service=self._transform_service_address)
| "Extract cells" >> beam.Map(lambda row: row._cells))
assert_that(cells, equal_to(expected_cells))
@@ -190,6 +196,11 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
(self.TABLE_ID, str(int(time.time())), secrets.token_hex(3)))
self.table.create()
_LOGGER.info("Created table [%s]", self.table.table_id)
+ if (os.environ.get('TRANSFORM_SERVICE_PORT')):
+ self._transform_service_address = (
+ 'localhost:' + os.environ.get('TRANSFORM_SERVICE_PORT'))
+ else:
+ self._transform_service_address = None
def tearDown(self):
try:
@@ -216,7 +227,8 @@ class TestWriteToBigtableXlangIT(unittest.TestCase):
project_id=self.project,
instance_id=self.instance.instance_id,
table_id=self.table.table_id,
- use_cross_language=True))
+ use_cross_language=True,
+ expansion_service=self._transform_service_address))
def test_set_mutation(self):
row1: DirectRow = DirectRow('key-1')
diff --git a/sdks/python/expansion-service-container/boot.go
b/sdks/python/expansion-service-container/boot.go
index ba56b349c4e..a04b176f5d8 100644
--- a/sdks/python/expansion-service-container/boot.go
+++ b/sdks/python/expansion-service-container/boot.go
@@ -97,8 +97,8 @@ func installExtraPackages(requirementsFile string) error {
return nil
}
-func getUpdatedRequirementsFile(oldRequirementsFileName string,
dependenciesDir string) (string, error) {
- oldExtraPackages, err := getLines(filepath.Join(dependenciesDir,
oldRequirementsFileName))
+func getUpdatedRequirementsFile(oldDependenciesRequirementsFile string,
dependenciesDir string) (string, error) {
+ oldExtraPackages, err := getLines(oldDependenciesRequirementsFile)
if err != nil {
return "", err
}
@@ -145,9 +145,20 @@ func launchExpansionServiceProcess() error {
args := []string{"-m", expansionServiceEntrypoint, "-p",
strconv.Itoa(*port), "--fully_qualified_name_glob", "*"}
- if *requirements_file != "" {
- log.Printf("Received the requirements file %v",
*requirements_file)
- updatedRequirementsFileName, err :=
getUpdatedRequirementsFile(*requirements_file, *dependencies_dir)
+ // Requirements file with dependencies to install.
+ // Note that we have to look for the requirements file in the
dependencies
+ // volume here not the requirements file at the top level. Latter
provides
+ // Beam dependencies.
+ dependencies_requirements_file := filepath.Join(*dependencies_dir,
*requirements_file)
+ dependencies_requirements_file_exists := false
+ if _, err := os.Stat(dependencies_requirements_file); err == nil {
+ dependencies_requirements_file_exists = true
+ }
+
+ // We only try to install dependencies, if the requirements file exists.
+ if dependencies_requirements_file_exists {
+ log.Printf("Received the requirements file %s with extra
packages.", dependencies_requirements_file)
+ updatedRequirementsFileName, err :=
getUpdatedRequirementsFile(dependencies_requirements_file, *dependencies_dir)
if err != nil {
return err
}
@@ -161,7 +172,10 @@ func launchExpansionServiceProcess() error {
if err != nil {
return err
}
+ } else {
+ log.Printf("Requirements file %s was provided but not
available.", dependencies_requirements_file)
}
+
if err := execx.Execute(pythonVersion, args...); err != nil {
return fmt.Errorf("could not start the expansion service: %s",
err)
}