gemini-code-assist[bot] commented on code in PR #39052:
URL: https://github.com/apache/beam/pull/39052#discussion_r3462300695
##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -68,6 +69,8 @@ def get_impl(self):
None, lambda payload, components, context: BigEndianIntegerCoder())
import psycopg2
+import pyarrow as pa
+import pyarrow.parquet as pq
Review Comment:

Top-level imports of optional dependencies like `pyarrow` can cause
`ImportError` when the module is imported in environments where these
dependencies are not installed. It is safer to import them lazily inside the
functions or context managers where they are actually used.
##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -618,6 +621,26 @@ def temp_pubsub_emulator(project_id="apache-beam-testing"):
yield created_topic_object.name
[email protected]
+def temp_delta_table():
+ with tempfile.TemporaryDirectory() as temp_dir:
+ log_dir = os.path.join(temp_dir, "_delta_log")
+ os.makedirs(log_dir, exist_ok=True)
+ table_data = pa.table({"name": ["a", "b", "c"]})
+ parquet_path = os.path.join(temp_dir, "part-00000.parquet")
+ pq.write_table(table_data, parquet_path)
+ file_size = os.path.getsize(parquet_path)
+ commit_content = (
+ '{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}\n'
+
'{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\\"type\\":\\"struct\\",\\"fields\\":[{\\"name\\":\\"name\\",\\"type\\":\\"string\\",\\"nullable\\":true,\\"metadata\\":{}}]}","partitionColumns":[],"configuration":{},"createdAt":123456789}}\n'
+
f'{{"add":{{"path":"part-00000.parquet","partitionValues":{{}},"size":{file_size},"modificationTime":123456789,"dataChange":true}}}}\n'
+ )
+ commit_file = os.path.join(log_dir, "00000000000000000000.json")
+ with open(commit_file, "w") as f:
+ f.write(commit_content)
Review Comment:

Specify `encoding="utf-8"` when opening the file to write the JSON content.
This ensures cross-platform compatibility and avoids potential encoding issues
on systems where the default encoding is not UTF-8 (e.g., Windows).
```suggestion
with open(commit_file, "w", encoding="utf-8") as f:
f.write(commit_content)
```
##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -565,6 +565,29 @@ def read_from_iceberg(
config_properties=config_properties))
+def read_from_delta(
+ table: str,
+ version: Optional[int] = None,
+ timestamp: Optional[str] = None,
+ hadoop_config: Optional[Mapping[str, str]] = None,
+):
Review Comment:

Using `Mapping` might raise a `NameError` if it is not imported in this
file. It is safer to use `Dict` (which is already imported and widely used in
this module) for the type hint of `hadoop_config`.
```suggestion
def read_from_delta(
table: str,
version: Optional[int] = None,
timestamp: Optional[str] = None,
hadoop_config: Optional[Dict[str, str]] = None,
):
```
##########
sdks/python/apache_beam/yaml/integration_tests.py:
##########
@@ -618,6 +621,26 @@ def temp_pubsub_emulator(project_id="apache-beam-testing"):
yield created_topic_object.name
[email protected]
+def temp_delta_table():
+ with tempfile.TemporaryDirectory() as temp_dir:
Review Comment:

Import `pyarrow` and `pyarrow.parquet` lazily inside the context manager to
avoid top-level import issues when `pyarrow` is not installed in the
environment.
```suggestion
@contextlib.contextmanager
def temp_delta_table():
import pyarrow as pa
import pyarrow.parquet as pq
with tempfile.TemporaryDirectory() as temp_dir:
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]