gemini-code-assist[bot] commented on code in PR #35951:
URL: https://github.com/apache/beam/pull/35951#discussion_r2298813787
##########
sdks/python/apache_beam/transforms/sideinputs_test.py:
##########
@@ -417,6 +426,71 @@
use_global_window=False,
label='assert per window')
+ @pytest.mark.it_validatesrunner
+ def test_side_input_with_sdf(self):
+ """Test a side input with SDF.
+
+ This test verifies consisency of side input when it is split due to
+ SDF (Splittable DoFns). The consistency is verified by checking the size
+ and fingerprint of the side input.
+
+ This test needs to run with at least 2 workers (--num_workers=2) and
+ autoscaling disabled (--autoscaling_algorithm=NONE). Otherwise it might
+ provide false positives (i.e. not fail on bad state).
+ """
+ initial_elements = 1000
+ num_records = 10000
+ key_size = 10
+ value_size = 100
+ expected_fingerprint = '00f7eeac8514721e2683d14a504b33d1'
+
+ class GetSyntheticSDFOptions(beam.DoFn):
+ """A DoFn that emits elements for genenrating SDF."""
+ def process(self, element: Any) -> Iterable[Dict[str, Union[int, str]]]:
+ yield {
+ 'num_records': num_records // initial_elements,
+ 'key_size': key_size,
+ 'value_size': value_size,
+ 'initial_splitting_num_bundles': 0,
+ 'initial_splitting_desired_bundle_size': 0,
+ 'sleep_per_input_record_sec': 0,
+ 'initial_splitting': 'const',
+ }
+
+ class SideInputTrackingDoFn(beam.DoFn):
+ """A DoFn that emits the size and fingerprint of the side input.
+
+ In this context, the size is the number of elements and the fingerprint
+ is the hash of the sorted serialized elements.
+ """
+ def process(
+ self, element: Any,
+ side_input: Iterable[Tuple[bytes,
+ bytes]]) -> Iterable[Tuple[int, str]]:
+
+ side_input_list = list(side_input)
+ size = len(side_input_list)
+ # Sort for consistent hashing.
+ m = hashlib.md5()
+ for key, value in sorted(side_input_list):
Review Comment:

The current implementation materializes the `side_input` iterable into a
list and then sorts it, which involves creating two lists. This can be slightly
optimized by calling `sorted()` directly on the iterable, which creates only
one list and can be more readable.
```suggestion
sorted_side_input = sorted(side_input)
size = len(sorted_side_input)
# Sort for consistent hashing.
m = hashlib.md5()
for key, value in sorted_side_input:
```
##########
sdks/python/apache_beam/transforms/sideinputs_test.py:
##########
@@ -37,6 +39,13 @@
from apache_beam.utils.timestamp import Timestamp
+def fingerprint_list(elements: List[Any]) -> str:
+ """Computes a stable fingerprint for an iterable of elements."""
+ # Sort and convert to string for consistent hashing.
+ s = str(sorted(elements))
+ return hashlib.md5(s.encode()).hexdigest()
+
Review Comment:

This `fingerprint_list` function is not used anywhere in the file. It seems
like it was intended for use, but a different implementation was chosen for the
fingerprint calculation inside `SideInputTrackingDoFn`. To keep the code clean,
this unused function should be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]