This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4d9b4363ae6 Consider windows with bands outside of [Min, Max]
equivalent (#35460)
4d9b4363ae6 is described below
commit 4d9b4363ae68056a24afffff5e36cedfe1a3b4f1
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Jul 1 10:32:31 2025 -0400
Consider windows with bands outside of [Min, Max] equivalent (#35460)
---
sdks/python/apache_beam/utils/windowed_value.py | 23 ++++++++++++++++++-----
1 file changed, 18 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/utils/windowed_value.py
b/sdks/python/apache_beam/utils/windowed_value.py
index f6232ce2f6b..2775ec4061b 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -432,13 +432,26 @@ class _IntervalWindowBase(object):
return self._end_object
def __hash__(self):
- return hash((self._start_micros, self._end_micros))
+ # Cut off window at start/end timestamps for comparison purposes since some
+ # portable runners do this already, and timestamps outside of the bands of
+ # Min/Max timestamps are functionally equal to Min/Max.
+ start = max(self._start_micros, MIN_TIMESTAMP.micros)
+ end = min(self._end_micros, MAX_TIMESTAMP.micros)
+ return hash((start, end))
def __eq__(self, other):
- return (
- type(self) == type(other) and
- self._start_micros == other._start_micros and
- self._end_micros == other._end_micros)
+ if type(self) != type(other):
+ return False
+
+ # Cut off window at start/end timestamps for comparison purposes since some
+ # portable runners do this already, and timestamps outside of the bands of
+ # Min/Max timestamps are functionally equal to Min/Max.
+ self_start = max(self._start_micros, MIN_TIMESTAMP.micros)
+ self_end = min(self._end_micros, MAX_TIMESTAMP.micros)
+ other_start = max(other._start_micros, MIN_TIMESTAMP.micros)
+ other_end = min(other._end_micros, MAX_TIMESTAMP.micros)
+
+ return (self_start == other_start and self_end == other_end)
def __repr__(self):
return '[%s, %s)' % (float(self.start), float(self.end))