gemini-code-assist[bot] commented on code in PR #39079:
URL: https://github.com/apache/beam/pull/39079#discussion_r3464409151
##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -577,13 +578,45 @@ def finish_bundle(self):
self, self.bundles, unprocessed_bundles, None, {None: self.watermark})
+class _PubSubSubscriberClient(object):
+ """SubscriberClient state cached for one DirectRunner Pub/Sub read."""
+ def __init__(self, client):
+ self.client = client
+ self._temporary_subscription = None
+ self._closed = False
+
+ def set_temporary_subscription(self, subscription):
+ self._temporary_subscription = subscription
+
+ def close(self):
+ if self._closed:
+ return
+ self._closed = True
+
+ try:
+ if self._temporary_subscription:
+ self.client.delete_subscription(
+ subscription=self._temporary_subscription)
+ except Exception:
+ _LOGGER.warning(
+ 'Failed to delete temporary Pub/Sub subscription %s',
+ self._temporary_subscription,
+ exc_info=True)
+
+ try:
+ self.client.close()
+ except Exception:
+ _LOGGER.warning(
+ 'Failed to close Pub/Sub subscriber client', exc_info=True)
Review Comment:

During interpreter shutdown, module-level globals like `_LOGGER` can be
cleared and set to `None`. Since `close()` is called from a finalizer during
garbage collection (which can happen during shutdown), check if `_LOGGER` is
not `None` before calling `_LOGGER.warning` to avoid raising an
`AttributeError`.
```suggestion
try:
if self._temporary_subscription:
self.client.delete_subscription(
subscription=self._temporary_subscription)
except Exception:
if _LOGGER is not None:
_LOGGER.warning(
'Failed to delete temporary Pub/Sub subscription %s',
self._temporary_subscription,
exc_info=True)
try:
self.client.close()
except Exception:
if _LOGGER is not None:
_LOGGER.warning(
'Failed to close Pub/Sub subscriber client', exc_info=True)
```
##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -577,13 +578,45 @@ def finish_bundle(self):
self, self.bundles, unprocessed_bundles, None, {None: self.watermark})
+class _PubSubSubscriberClient(object):
+ """SubscriberClient state cached for one DirectRunner Pub/Sub read."""
+ def __init__(self, client):
+ self.client = client
+ self._temporary_subscription = None
+ self._closed = False
Review Comment:

Add a per-client lock `self._lock` to serialize subscription creation for
each transform without holding the global class-level lock during network calls.
```suggestion
def __init__(self, client):
self.client = client
self._temporary_subscription = None
self._closed = False
self._lock = threading.Lock()
```
##########
sdks/python/apache_beam/runners/direct/transform_evaluator.py:
##########
@@ -627,18 +660,35 @@ def get_subscription(
if short_sub_name:
return pubsub.SubscriberClient.subscription_path(project, short_sub_name)
- if transform in cls._subscription_cache:
+ with cls._subscriber_client_cache_lock:
+ if transform in cls._subscription_cache:
+ return cls._subscription_cache[transform]
+
+ subscriber_client = cls._get_subscriber_client_state_unlocked(transform)
+ sub_client = subscriber_client.client
+ sub_name = sub_client.subscription_path(
+ sub_project,
+ 'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
+ topic_name = sub_client.topic_path(project, short_topic_name)
+ sub_client.create_subscription(name=sub_name, topic=topic_name)
+ subscriber_client.set_temporary_subscription(sub_name)
+ cls._subscription_cache[transform] = sub_name
return cls._subscription_cache[transform]
Review Comment:

Holding the global class-level lock `_subscriber_client_cache_lock` during
the synchronous network call `sub_client.create_subscription(...)` can block
all other threads trying to read from or initialize other Pub/Sub sources.
We can optimize this by releasing the global lock before making the network
call, and using a per-transform lock (`subscriber_client._lock`) to serialize
subscription creation for the same transform.
```suggestion
with cls._subscriber_client_cache_lock:
subscriber_client =
cls._get_subscriber_client_state_unlocked(transform)
with subscriber_client._lock:
with cls._subscriber_client_cache_lock:
if transform in cls._subscription_cache:
return cls._subscription_cache[transform]
sub_client = subscriber_client.client
sub_name = sub_client.subscription_path(
sub_project,
'beam_%d_%x' % (int(time.time()), random.randrange(1 << 32)))
topic_name = sub_client.topic_path(project, short_topic_name)
sub_client.create_subscription(name=sub_name, topic=topic_name)
subscriber_client.set_temporary_subscription(sub_name)
with cls._subscriber_client_cache_lock:
cls._subscription_cache[transform] = sub_name
return sub_name
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]