gemini-code-assist[bot] commented on code in PR #39028:
URL: https://github.com/apache/beam/pull/39028#discussion_r3439832022
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -394,6 +402,69 @@ def __init__(self, client=None, temp_dataset_id=None,
temp_table_ref=None):
self.created_temp_dataset = False
+ @classmethod
+ def _table_definition_cache_key(cls, project_id, dataset_id, table_id):
+ return (project_id, dataset_id, table_id)
+
+ @classmethod
+ def _get_cached_table_definition(cls, project_id, dataset_id, table_id):
+ cache_key = cls._table_definition_cache_key(
+ project_id, dataset_id, table_id)
+ now = time.monotonic()
+ with cls._table_definition_cache_lock:
+ cache_entry = cls._table_definition_cache.get(cache_key)
+ if cache_entry is None:
+ return None
+
+ expires_at, table = cache_entry
+ if expires_at <= now:
+ cls._table_definition_cache.pop(cache_key, None)
+ return None
+
+ cls._table_definition_cache.move_to_end(cache_key)
+ return table
+
+ @classmethod
+ def _cache_table_definition(cls, project_id, dataset_id, table_id, table):
+ table_type = getattr(bigquery, 'Table', None)
+ if table_type is None or not isinstance(table, table_type):
+ cls._invalidate_table_definition_cache(project_id, dataset_id, table_id)
+ return
+
+ cache_key = cls._table_definition_cache_key(
+ project_id, dataset_id, table_id)
+ expires_at = time.monotonic() + cls._TABLE_DEFINITION_CACHE_TTL_SECS
+ with cls._table_definition_cache_lock:
+ cls._table_definition_cache[cache_key] = (expires_at, table)
+ cls._table_definition_cache.move_to_end(cache_key)
+ while (len(cls._table_definition_cache)
+ > cls._TABLE_DEFINITION_CACHE_MAX_ENTRIES):
+ cls._table_definition_cache.popitem(last=False)
+
+ @classmethod
+ def _invalidate_table_definition_cache(
+ cls, project_id=None, dataset_id=None, table_id=None):
+ with cls._table_definition_cache_lock:
+ if (project_id is not None and dataset_id is not None and
+ table_id is not None):
+ cache_key = cls._table_definition_cache_key(
+ project_id, dataset_id, table_id)
+ cls._table_definition_cache.pop(cache_key, None)
+ return
+
Review Comment:

When clearing the entire cache (i.e., when `project_id`, `dataset_id`, and
`table_id` are all `None`), we can use `cls._table_definition_cache.clear()`
instead of building a list of all keys and popping them one by one. This is
much more efficient and simpler.
```python
with cls._table_definition_cache_lock:
if project_id is None and dataset_id is None and table_id is None:
cls._table_definition_cache.clear()
return
if (project_id is not None and dataset_id is not None and
table_id is not None):
cache_key = cls._table_definition_cache_key(
project_id, dataset_id, table_id)
cls._table_definition_cache.pop(cache_key, None)
return
```
##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -358,6 +360,12 @@ class BigQueryWrapper(object):
HISTOGRAM_METRIC_LOGGER = MetricLogger()
+ # Shared by wrapper instances within one Python SDK worker process.
+ _TABLE_DEFINITION_CACHE_MAX_ENTRIES = 256
+ _TABLE_DEFINITION_CACHE_TTL_SECS = 60 * 60
Review Comment:

A TTL of 1 hour (`60 * 60` seconds) is quite long for caching table
definitions. In streaming pipelines or long-running jobs where table schemas
might be updated dynamically (e.g., adding a nullable column), workers could
use stale metadata for up to an hour, leading to schema mismatch errors.
Consider reducing the default TTL to a more conservative value (e.g., 5 or 10
minutes) or making it configurable via pipeline options.
```suggestion
_TABLE_DEFINITION_CACHE_TTL_SECS = 5 * 60
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]