This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a9f50fae94b Python ExternalTransformProvider improvements (#33359)
a9f50fae94b is described below

commit a9f50fae94b49a4e83acb131c04a34731a4a6604
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Dec 17 16:30:21 2024 -0500

    Python ExternalTransformProvider improvements (#33359)
---
 sdks/python/apache_beam/transforms/external.py     | 14 ++---
 .../transforms/external_transform_provider.py      | 65 +++++++++++++++-------
 2 files changed, 51 insertions(+), 28 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index e44f7482dc6..fb37a8fd974 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -962,14 +962,14 @@ class JavaJarExpansionService(object):
       self, path_to_jar, extra_args=None, classpath=None, append_args=None):
     if extra_args and append_args:
       raise ValueError('Only one of extra_args or append_args may be provided')
-    self._path_to_jar = path_to_jar
+    self.path_to_jar = path_to_jar
     self._extra_args = extra_args
     self._classpath = classpath or []
     self._service_count = 0
     self._append_args = append_args or []
 
   def is_existing_service(self):
-    return subprocess_server.is_service_endpoint(self._path_to_jar)
+    return subprocess_server.is_service_endpoint(self.path_to_jar)
 
   @staticmethod
   def _expand_jars(jar):
@@ -997,7 +997,7 @@ class JavaJarExpansionService(object):
   def _default_args(self):
     """Default arguments to be used by `JavaJarExpansionService`."""
 
-    to_stage = ','.join([self._path_to_jar] + sum((
+    to_stage = ','.join([self.path_to_jar] + sum((
         JavaJarExpansionService._expand_jars(jar)
         for jar in self._classpath or []), []))
     args = ['{{PORT}}', f'--filesToStage={to_stage}']
@@ -1009,8 +1009,8 @@ class JavaJarExpansionService(object):
 
   def __enter__(self):
     if self._service_count == 0:
-      self._path_to_jar = subprocess_server.JavaJarServer.local_jar(
-          self._path_to_jar)
+      self.path_to_jar = subprocess_server.JavaJarServer.local_jar(
+          self.path_to_jar)
       if self._extra_args is None:
         self._extra_args = self._default_args() + self._append_args
       # Consider memoizing these servers (with some timeout).
@@ -1018,7 +1018,7 @@ class JavaJarExpansionService(object):
           'Starting a JAR-based expansion service from JAR %s ' + (
               'and with classpath: %s' %
               self._classpath if self._classpath else ''),
-          self._path_to_jar)
+          self.path_to_jar)
       classpath_urls = [
           subprocess_server.JavaJarServer.local_jar(path)
           for jar in self._classpath
@@ -1026,7 +1026,7 @@ class JavaJarExpansionService(object):
       ]
       self._service_provider = subprocess_server.JavaJarServer(
           ExpansionAndArtifactRetrievalStub,
-          self._path_to_jar,
+          self.path_to_jar,
           self._extra_args,
           classpath=classpath_urls)
       self._service = self._service_provider.__enter__()
diff --git a/sdks/python/apache_beam/transforms/external_transform_provider.py 
b/sdks/python/apache_beam/transforms/external_transform_provider.py
index 117c7f7c9b9..b22cd4b24cb 100644
--- a/sdks/python/apache_beam/transforms/external_transform_provider.py
+++ b/sdks/python/apache_beam/transforms/external_transform_provider.py
@@ -26,6 +26,7 @@ from typing import Tuple
 
 from apache_beam.transforms import PTransform
 from apache_beam.transforms.external import BeamJarExpansionService
+from apache_beam.transforms.external import JavaJarExpansionService
 from apache_beam.transforms.external import SchemaAwareExternalTransform
 from apache_beam.transforms.external import SchemaTransformsConfig
 from apache_beam.typehints.schemas import named_tuple_to_schema
@@ -133,37 +134,57 @@ class ExternalTransformProvider:
   (see the `urn_pattern` parameter).
 
   These classes are generated when :class:`ExternalTransformProvider` is
-  initialized. We need to give it one or more expansion service addresses that
-  are already up and running:
-  >>> provider = ExternalTransformProvider(["localhost:12345",
-  ...                                             "localhost:12121"])
-  We can also give it the gradle target of a standard Beam expansion service:
-  >>> provider = ExternalTransform(BeamJarExpansionService(
-  ...     "sdks:java:io:google-cloud-platform:expansion-service:shadowJar"))
-  Let's take a look at the output of :func:`get_available()` to know the
-  available transforms in the expansion service(s) we provided:
+  initialized. You can give it an expansion service address that is already
+  up and running:
+
+  >>> provider = ExternalTransformProvider("localhost:12345")
+
+  Or you can give it the path to an expansion service Jar file:
+
+  >>> provider = ExternalTransformProvider(JavaJarExpansionService(
+          "path/to/expansion-service.jar"))
+
+  Or you can give it the gradle target of a standard Beam expansion service:
+
+  >>> provider = ExternalTransformProvider(BeamJarExpansionService(
+          "sdks:java:io:google-cloud-platform:expansion-service:shadowJar"))
+
+  Note that you can provide a list of these services:
+
+  >>> provider = ExternalTransformProvider([
+          "localhost:12345",
+          JavaJarExpansionService("path/to/expansion-service.jar"),
+          BeamJarExpansionService(
+            "sdks:java:io:google-cloud-platform:expansion-service:shadowJar")])
+
+  The output of :func:`get_available()` provides a list of available transforms
+  in the provided expansion service(s):
+
   >>> provider.get_available()
   [('JdbcWrite', 'beam:schematransform:org.apache.beam:jdbc_write:v1'),
   ('BigtableRead', 'beam:schematransform:org.apache.beam:bigtable_read:v1'),
   ...]
 
-  Then retrieve a transform by :func:`get()`, :func:`get_urn()`, or by directly
-  accessing it as an attribute of :class:`ExternalTransformProvider`.
-  All of the following commands do the same thing:
+  You can retrieve a transform with :func:`get()`, :func:`get_urn()`, or by
+  directly accessing it as an attribute. The following lines all do the same
+  thing:
+
   >>> provider.get('BigqueryStorageRead')
   >>> provider.get_urn(
-  ...       'beam:schematransform:org.apache.beam:bigquery_storage_read:v1')
+            'beam:schematransform:org.apache.beam:bigquery_storage_read:v1')
   >>> provider.BigqueryStorageRead
 
-  You can inspect the transform's documentation to know more about it. This
-  returns some documentation only IF the underlying SchemaTransform
-  implementation provides any.
+  You can inspect the transform's documentation for more details. The following
+  returns the documentation provided by the underlying SchemaTransform. If no
+  such documentation is provided, this will be empty.
+
   >>> import inspect
   >>> inspect.getdoc(provider.BigqueryStorageRead)
 
   Similarly, you can inspect the transform's signature to know more about its
   parameters, including their names, types, and any documentation that the
   underlying SchemaTransform may provide:
+
   >>> inspect.signature(provider.BigqueryStorageRead)
   (query: 'typing.Union[str, NoneType]: The SQL query to be executed to...',
   row_restriction: 'typing.Union[str, NoneType]: Read only rows that match...',
@@ -178,8 +199,6 @@ class ExternalTransformProvider:
                 query=query,
                 row_restriction=restriction)
         | 'Some processing' >> beam.Map(...))
-
-  Experimental; no backwards compatibility guarantees.
   """
   def __init__(self, expansion_services, urn_pattern=STANDARD_URN_PATTERN):
     f"""Initialize an ExternalTransformProvider
@@ -188,6 +207,7 @@ class ExternalTransformProvider:
       A list of expansion services to discover transforms from.
       Supported forms:
       * a string representing the expansion service address
+      * a :attr:`JavaJarExpansionService` pointing to the path of a Java Jar
       * a :attr:`BeamJarExpansionService` pointing to a gradle target
     :param urn_pattern:
       The regular expression used to match valid transforms. In addition to
@@ -213,11 +233,14 @@ class ExternalTransformProvider:
       target = service
       if isinstance(service, BeamJarExpansionService):
         target = service.gradle_target
+      if isinstance(service, JavaJarExpansionService):
+        target = service.path_to_jar
       try:
         schematransform_configs = 
SchemaAwareExternalTransform.discover(service)
       except Exception as e:
         logging.exception(
-            "Encountered an error while discovering expansion service %s:\n%s",
+            "Encountered an error while discovering "
+            "expansion service at '%s':\n%s",
             target,
             e)
         continue
@@ -249,7 +272,7 @@ class ExternalTransformProvider:
 
       if skipped_urns:
         logging.info(
-            "Skipped URN(s) in %s that don't follow the pattern \"%s\": %s",
+            "Skipped URN(s) in '%s' that don't follow the pattern \"%s\": %s",
             target,
             self._urn_pattern,
             skipped_urns)
@@ -262,7 +285,7 @@ class ExternalTransformProvider:
     return list(self._name_to_urn.items())
 
   def get_all(self) -> Dict[str, ExternalTransform]:
-    """Get all ExternalTransform"""
+    """Get all ExternalTransforms"""
     return self._transforms
 
   def get(self, name) -> ExternalTransform:

Reply via email to