gemini-code-assist[bot] commented on code in PR #38590:
URL: https://github.com/apache/beam/pull/38590#discussion_r3283928146
##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -441,6 +449,22 @@ def encode_to_stream(self, value, stream, nested):
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
+ elif t is datetime.datetime:
+ # We use RFC 9557 for lossless encoding of timezone info.
+ stream.write_byte(DATETIME_TYPE)
+ stream.write(value.isoformat().encode("utf-8"))
+ if value.tzinfo is not None and type(
+ value.tzinfo) is not datetime.timezone:
+ stream.write(f"[{value.tzinfo}]".encode("utf-8"))
+ if type(
+ value.tzinfo) is datetime.timezone and (tzname :=
+ value.tzname()) is not None:
+ stream.write(f"[tzn={tzname}]".encode("utf-8"))
+ if value.fold != 0:
+ stream.write(f"[f={value.fold}]".encode("utf-8"))
Review Comment:

The current implementation of `datetime` encoding has two issues:
1. It performs multiple `stream.write` calls. If `nested=True`, each call
would potentially create a separate length-prefixed block (depending on the
`OutputStream` implementation), but the decoder uses a single
`stream.read_all(nested)` call, which would only read the first part.
2. It omits the `nested` parameter in the `stream.write` calls, which will
cause a mismatch with the decoder's `read_all(nested)` when the coder is used
in a nested context (e.g., as an element in a list).
You should construct the full string first and write it once using the
`nested` parameter.
```suggestion
elif t is datetime.datetime:
# We use RFC 9557 for lossless encoding of timezone info.
stream.write_byte(DATETIME_TYPE)
dt_str = value.isoformat()
if value.tzinfo is not None and type(value.tzinfo) is not
datetime.timezone:
dt_str += f"[{value.tzinfo}]"
if type(value.tzinfo) is datetime.timezone and (tzname :=
value.tzname()) is not None:
dt_str += f"[tzn={tzname}]"
if value.fold != 0:
dt_str += f"[f={value.fold}]"
stream.write(dt_str.encode("utf-8"), nested)
```
##########
sdks/python/apache_beam/coders/coders_test_common.py:
##########
@@ -418,52 +428,83 @@ def test_fake_deterministic_fast_primitives_coder(self):
self.check_coder(coders.TupleCoder((coder, )), (v, ))
def test_bytes_coder(self):
- self.check_coder(coders.BytesCoder(), b'a', b'\0', b'z' * 1000)
+ self.check_coder(coders.BytesCoder(), b"a", b"\0", b"z" * 1000)
def test_bool_coder(self):
self.check_coder(coders.BooleanCoder(), True, False)
+ def test_fast_primitives_coder_datetime(self):
+ tz = zoneinfo.ZoneInfo("America/New_York")
Review Comment:

This test will fail on Python 3.8 because `zoneinfo` is missing. Add a check
to skip the test or the specific part that requires `zoneinfo` if the module is
not available.
```python
def test_fast_primitives_coder_datetime(self):
if zoneinfo is None:
self.skipTest("zoneinfo module not available")
tz = zoneinfo.ZoneInfo("America/New_York")
```
##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -614,6 +645,46 @@ def decode_from_stream(self, stream, nested):
return v
elif t == BOOL_TYPE:
return not not stream.read_byte()
+ elif t == DATETIME_TYPE:
+ rfc_9557_str = stream.read_all(nested).decode("utf-8")
+ first_tag_idx = rfc_9557_str.find("[")
+ if first_tag_idx == -1:
+ return datetime.datetime.fromisoformat(rfc_9557_str)
+
+ base_iso = rfc_9557_str[:first_tag_idx]
+ tags_str = rfc_9557_str[first_tag_idx:]
+ dt = datetime.datetime.fromisoformat(base_iso)
+
+ fold = 0
+ zone_name = None
+ tz_name = None
+
+ tags = tags_str.replace("]", "").split("[")
+ for tag in tags:
+ if not tag:
+ continue
+ if tag.startswith("f="):
+ fold = int(tag[2:])
+ elif tag.startswith("tzn="):
+ tz_name = tag[4:]
+ elif "=" in tag:
+ # Skip unknown tags like [knort=blorgel]
+ continue
+ else:
+ zone_name = tag
+
+ if tz_name and (offset := dt.utcoffset()) is not None:
+ dt = dt.replace(tzinfo=datetime.timezone(offset=offset, name=tz_name))
+ elif zone_name:
+ dt = dt.replace(tzinfo=zoneinfo.ZoneInfo(zone_name))
Review Comment:

If `zoneinfo` failed to import (e.g., on Python 3.8), this line will raise a
`NameError` or `AttributeError`. You should check if `zoneinfo` is available
before attempting to use it. If it's not available, the datetime will still
have the correct UTC offset restored from the ISO string by `fromisoformat`,
which is a reasonable fallback.
```suggestion
elif zone_name and zoneinfo is not None:
dt = dt.replace(tzinfo=zoneinfo.ZoneInfo(zone_name))
```
##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -441,6 +449,22 @@ def encode_to_stream(self, value, stream, nested):
elif t is bool:
stream.write_byte(BOOL_TYPE)
stream.write_byte(value)
+ elif t is datetime.datetime:
+ # We use RFC 9557 for lossless encoding of timezone info.
+ stream.write_byte(DATETIME_TYPE)
+ stream.write(value.isoformat().encode("utf-8"))
+ if value.tzinfo is not None and type(
+ value.tzinfo) is not datetime.timezone:
+ stream.write(f"[{value.tzinfo}]".encode("utf-8"))
+ if type(
+ value.tzinfo) is datetime.timezone and (tzname :=
+ value.tzname()) is not None:
+ stream.write(f"[tzn={tzname}]".encode("utf-8"))
+ if value.fold != 0:
+ stream.write(f"[f={value.fold}]".encode("utf-8"))
+ elif t is datetime.date:
+ stream.write_byte(DATE_TYPE)
+ stream.write(value.isoformat().encode("utf-8"))
Review Comment:

The `stream.write` call for `date` is missing the `nested` parameter. This
will cause a mismatch with the decoder's `read_all(nested)` when this coder is
used in a nested context.
```suggestion
elif t is datetime.date:
stream.write_byte(DATE_TYPE)
stream.write(value.isoformat().encode("utf-8"), nested)
```
##########
sdks/python/apache_beam/coders/coder_impl.py:
##########
@@ -28,16 +28,19 @@
For internal use only; no backwards-compatibility guarantees.
"""
+
# pytype: skip-file
# ruff: noqa: UP006
import dataclasses
+import datetime
import decimal
import enum
import itertools
import json
import logging
import pickle
+import zoneinfo
Review Comment:

The `zoneinfo` module was introduced in Python 3.9. Since Apache Beam still
supports Python 3.8, this direct import will cause an `ImportError` on those
versions. Consider using a conditional import to maintain compatibility.
```suggestion
try:
import zoneinfo
except ImportError:
zoneinfo = None # type: ignore
```
##########
sdks/python/apache_beam/coders/coders_test_common.py:
##########
@@ -29,6 +31,7 @@
import sys
import textwrap
import unittest
+import zoneinfo
Review Comment:

The `zoneinfo` module is not available in Python 3.8. To ensure tests pass
on all supported Python versions, use a conditional import.
```suggestion
try:
import zoneinfo
except ImportError:
zoneinfo = None
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]