[ 
https://issues.apache.org/jira/browse/BEAM-3759?focusedWorklogId=82038&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82038
 ]

ASF GitHub Bot logged work on BEAM-3759:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/18 20:57
            Start Date: 19/Mar/18 20:57
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #4763: [BEAM-3759] Add 
support for PaneInfo in WindowedValues
URL: https://github.com/apache/beam/pull/4763
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 8af394b6686..98dd508556a 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -132,11 +132,16 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
   pass
 
 
+cdef class PaneInfoCoderImpl(StreamCoderImpl):
+  cdef int _choose_encoding(self, value)
+
+
 cdef class WindowedValueCoderImpl(StreamCoderImpl):
   """A coder for windowed values."""
   cdef CoderImpl _value_coder
   cdef CoderImpl _timestamp_coder
   cdef CoderImpl _windows_coder
+  cdef CoderImpl _pane_info_coder
 
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index d47357df8c9..cc7ed87c3ad 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -672,6 +672,82 @@ def _construct_from_sequence(self, components):
     return components
 
 
+class PaneInfoEncoding(object):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Encoding used to describe a PaneInfo descriptor.  A PaneInfo descriptor
+  can be encoded in three different ways: with a single byte (FIRST), with a
+  single byte followed by a varint describing a single index (ONE_INDEX) or
+  with a single byte followed by two varints describing two separate indices:
+  the index and nonspeculative index.
+  """
+
+  FIRST = 0
+  ONE_INDEX = 1
+  TWO_INDICES = 2
+
+
+class PaneInfoCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees.
+
+  Coder for a PaneInfo descriptor."""
+
+  def _choose_encoding(self, value):
+    if ((value.index == 0 and value.nonspeculative_index == 0) or
+        value.timing == windowed_value.PaneInfoTiming.UNKNOWN):
+      return PaneInfoEncoding.FIRST
+    elif (value.index == value.nonspeculative_index or
+          value.timing == windowed_value.PaneInfoTiming.EARLY):
+      return PaneInfoEncoding.ONE_INDEX
+    else:
+      return PaneInfoEncoding.TWO_INDICES
+
+  def encode_to_stream(self, value, out, nested):
+    encoding_type = self._choose_encoding(value)
+    out.write_byte(value.encoded_byte | (encoding_type << 4))
+    if encoding_type == PaneInfoEncoding.FIRST:
+      return
+    elif encoding_type == PaneInfoEncoding.ONE_INDEX:
+      out.write_var_int64(value.index)
+    elif encoding_type == PaneInfoEncoding.TWO_INDICES:
+      out.write_var_int64(value.index)
+      out.write_var_int64(value.nonspeculative_index)
+    else:
+      raise NotImplementedError('Invalid PaneInfoEncoding: %s' % encoding_type)
+
+  def decode_from_stream(self, in_stream, nested):
+    encoded_first_byte = in_stream.read_byte()
+    base = windowed_value._BYTE_TO_PANE_INFO[encoded_first_byte & 0xF]
+    assert base is not None
+    encoding_type = encoded_first_byte >> 4
+    if encoding_type == PaneInfoEncoding.FIRST:
+      return base
+    elif encoding_type == PaneInfoEncoding.ONE_INDEX:
+      index = in_stream.read_var_int64()
+      if base.timing == windowed_value.PaneInfoTiming.EARLY:
+        nonspeculative_index = -1
+      else:
+        nonspeculative_index = index
+    elif encoding_type == PaneInfoEncoding.TWO_INDICES:
+      index = in_stream.read_var_int64()
+      nonspeculative_index = in_stream.read_var_int64()
+    else:
+      raise NotImplementedError('Invalid PaneInfoEncoding: %s' % encoding_type)
+    return windowed_value.PaneInfo(
+        base.is_first, base.is_last, base.timing, index, nonspeculative_index)
+
+  def estimate_size(self, value, nested=False):
+    """Estimates the encoded size of the given value, in bytes."""
+    size = 1
+    encoding_type = self._choose_encoding(value)
+    if encoding_type == PaneInfoEncoding.ONE_INDEX:
+      size += get_varint_size(value.index)
+    elif encoding_type == PaneInfoEncoding.TWO_INDICES:
+      size += get_varint_size(value.index)
+      size += get_varint_size(value.nonspeculative_index)
+    return size
+
+
 class WindowedValueCoderImpl(StreamCoderImpl):
   """For internal use only; no backwards-compatibility guarantees.
 
@@ -694,6 +770,7 @@ def __init__(self, value_coder, timestamp_coder, 
window_coder):
     self._value_coder = value_coder
     self._timestamp_coder = timestamp_coder
     self._windows_coder = TupleSequenceCoderImpl(window_coder)
+    self._pane_info_coder = PaneInfoCoderImpl()
 
   def encode_to_stream(self, value, out, nested):
     wv = value  # type cast
@@ -709,8 +786,7 @@ def encode_to_stream(self, value, out, nested):
             restore_sign * (abs(wv.timestamp_micros) / 1000)))
     self._windows_coder.encode_to_stream(wv.windows, out, True)
     # Default PaneInfo encoded byte representing NO_FIRING.
-    # TODO(BEAM-1522): Remove the hard coding here once PaneInfo is supported.
-    out.write_byte(0xF)
+    self._pane_info_coder.encode_to_stream(wv.pane_info, out, True)
     self._value_coder.encode_to_stream(wv.value, out, nested)
 
   def decode_from_stream(self, in_stream, nested):
@@ -734,15 +810,14 @@ def decode_from_stream(self, in_stream, nested):
 
     windows = self._windows_coder.decode_from_stream(in_stream, True)
     # Read PaneInfo encoded byte.
-    # TODO(BEAM-1522): Ignored for now but should be converted to pane info 
once
-    # it is supported.
-    in_stream.read_byte()
+    pane_info = self._pane_info_coder.decode_from_stream(in_stream, True)
     value = self._value_coder.decode_from_stream(in_stream, nested)
     return windowed_value.create(
         value,
         # Avoid creation of Timestamp object.
         timestamp,
-        windows)
+        windows,
+        pane_info)
 
   def get_estimated_size_and_observables(self, value, nested=False):
     """Returns estimated size of value along with any nested observables."""
@@ -761,8 +836,8 @@ def get_estimated_size_and_observables(self, value, 
nested=False):
         self._timestamp_coder.estimate_size(value.timestamp, nested=True))
     estimated_size += (
         self._windows_coder.estimate_size(value.windows, nested=True))
-    # for pane info
-    estimated_size += 1
+    estimated_size += (
+        self._pane_info_coder.estimate_size(value.pane_info, nested=True))
     return estimated_size, observables
 
 
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index fc7279d5e01..0ea7da2b6ad 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -275,6 +275,38 @@ def iter_generator(count):
                           iterable_coder.decode(
                               iterable_coder.encode(iter_generator(count))))
 
+  def test_windowedvalue_coder_paneinfo(self):
+    coder = coders.WindowedValueCoder(coders.VarIntCoder(),
+                                      coders.GlobalWindowCoder())
+    test_paneinfo_values = [
+        windowed_value.PANE_INFO_UNKNOWN,
+        windowed_value.PaneInfo(
+            True, True, windowed_value.PaneInfoTiming.EARLY, 0, -1),
+        windowed_value.PaneInfo(
+            True, False, windowed_value.PaneInfoTiming.ON_TIME, 0, 0),
+        windowed_value.PaneInfo(
+            True, False, windowed_value.PaneInfoTiming.ON_TIME, 10, 0),
+        windowed_value.PaneInfo(
+            False, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 23),
+        windowed_value.PaneInfo(
+            False, True, windowed_value.PaneInfoTiming.ON_TIME, 12, 23),
+        windowed_value.PaneInfo(
+            False, False, windowed_value.PaneInfoTiming.LATE, 0, 123),]
+
+    test_values = [windowed_value.WindowedValue(123, 234, (GlobalWindow(),), p)
+                   for p in test_paneinfo_values]
+
+    # Test unnested.
+    self.check_coder(coder, windowed_value.WindowedValue(
+        123, 234, (GlobalWindow(),), windowed_value.PANE_INFO_UNKNOWN))
+    for value in test_values:
+      self.check_coder(coder, value)
+
+    # Test nested.
+    for value1 in test_values:
+      for value2 in test_values:
+        self.check_coder(coders.TupleCoder((coder, coder)), (value1, value2))
+
   def test_windowed_value_coder(self):
     coder = coders.WindowedValueCoder(coders.VarIntCoder(),
                                       coders.GlobalWindowCoder())
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd 
b/sdks/python/apache_beam/utils/windowed_value.pxd
index 5544baf73e0..710f22a8798 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -25,6 +25,7 @@ cdef type Timestamp
 cdef class WindowedValue(object):
   cdef public object value
   cdef public object windows
+  cdef public object pane_info
   cdef public int64_t timestamp_micros
   cdef object timestamp_object
 
@@ -35,4 +36,4 @@ cdef class WindowedValue(object):
 
 @cython.locals(wv=WindowedValue)
 cpdef WindowedValue create(
-  object value, int64_t timestamp_micros, object windows)
+  object value, int64_t timestamp_micros, object windows, object pane_info=*)
diff --git a/sdks/python/apache_beam/utils/windowed_value.py 
b/sdks/python/apache_beam/utils/windowed_value.py
index be2785432a1..1b3228b2e6e 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -32,6 +32,111 @@
 from apache_beam.utils.timestamp import Timestamp
 
 
+class PaneInfoTiming(object):
+  """The timing of a PaneInfo."""
+
+  EARLY = 0
+  ON_TIME = 1
+  LATE = 2
+  UNKNOWN = 3
+
+
+class PaneInfo(object):
+  """Describes the trigger firing information for a given WindowedValue."""
+
+  def __init__(self, is_first, is_last, timing, index, nonspeculative_index):
+    self._is_first = is_first
+    self._is_last = is_last
+    self._timing = timing
+    self._index = index
+    self._nonspeculative_index = nonspeculative_index
+    self._encoded_byte = self._get_encoded_byte()
+
+  def _get_encoded_byte(self):
+    byte = 0
+    if self.is_first:
+      byte |= 1
+    if self.is_last:
+      byte |= 2
+    byte |= self.timing << 2
+    return byte
+
+  @staticmethod
+  def from_encoded_byte(encoded_byte):
+    assert encoded_byte in _BYTE_TO_PANE_INFO
+    return _BYTE_TO_PANE_INFO[encoded_byte]
+
+  # Because common PaneInfo objects are cached, it is important that the value
+  # is immutable.  We therefore explicitly enforce this here with read-only
+  # properties.
+
+  @property
+  def is_first(self):
+    return self._is_first
+
+  @property
+  def is_last(self):
+    return self._is_last
+
+  @property
+  def timing(self):
+    return self._timing
+
+  @property
+  def index(self):
+    return self._index
+
+  @property
+  def nonspeculative_index(self):
+    return self._nonspeculative_index
+
+  @property
+  def encoded_byte(self):
+    return self._encoded_byte
+
+  def __repr__(self):
+    return ('PaneInfo(first: %r, last: %r, timing: %s, index: %d, '
+            'nonspeculative_index: %d)') % (self.is_first, self.is_last,
+                                            self.timing, self.index,
+                                            self.nonspeculative_index)
+
+  def __eq__(self, other):
+    if self is other:
+      return True
+    return (self.is_first == other.is_first and
+            self.is_last == other.is_last and
+            self.timing == other.timing and
+            self.index == other.index and
+            self.nonspeculative_index == other.nonspeculative_index)
+
+  def __hash__(self):
+    return hash((self.is_first, self.is_last, self.timing, self.index,
+                 self.nonspeculative_index))
+
+
+def _construct_well_known_pane_infos():
+  pane_infos = []
+  for timing in (PaneInfoTiming.EARLY, PaneInfoTiming.ON_TIME,
+                 PaneInfoTiming.LATE, PaneInfoTiming.UNKNOWN):
+    nonspeculative_index = -1 if timing == PaneInfoTiming.EARLY else 0
+    pane_infos.append(PaneInfo(True, True, timing, 0, nonspeculative_index))
+    pane_infos.append(PaneInfo(True, False, timing, 0, nonspeculative_index))
+    pane_infos.append(PaneInfo(False, True, timing, -1, nonspeculative_index))
+    pane_infos.append(PaneInfo(False, False, timing, -1, nonspeculative_index))
+  result = [None] * (max(p.encoded_byte for p in pane_infos) + 1)
+  for pane_info in pane_infos:
+    result[pane_info.encoded_byte] = pane_info
+  return result
+
+
+# Cache of well-known PaneInfo objects.
+_BYTE_TO_PANE_INFO = _construct_well_known_pane_infos()
+
+
+# Default PaneInfo descriptor for when a value is not the output of triggering.
+PANE_INFO_UNKNOWN = _BYTE_TO_PANE_INFO[0xF]
+
+
 class WindowedValue(object):
   """A windowed value having a value, a timestamp and set of windows.
 
@@ -40,9 +145,12 @@ class WindowedValue(object):
     timestamp: Timestamp associated with the value as seconds since Unix epoch.
     windows: A set (iterable) of window objects for the value. The window
       object are descendants of the BoundedWindow class.
+    pane_info: A PaneInfo descriptor describing the triggering information for
+      the pane that contained this value.  If None, will be set to
+      PANE_INFO_UNKNOWN.
   """
 
-  def __init__(self, value, timestamp, windows):
+  def __init__(self, value, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
     # For performance reasons, only timestamp_micros is stored by default
     # (as a C int). The Timestamp object is created on demand below.
     self.value = value
@@ -53,6 +161,7 @@ def __init__(self, value, timestamp, windows):
                                else Timestamp.of(timestamp))
       self.timestamp_micros = self.timestamp_object.micros
     self.windows = windows
+    self.pane_info = pane_info
 
   @property
   def timestamp(self):
@@ -61,15 +170,19 @@ def timestamp(self):
     return self.timestamp_object
 
   def __repr__(self):
-    return '(%s, %s, %s)' % (
+    return '(%s, %s, %s, %s)' % (
         repr(self.value),
         'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
         'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
         float(self.timestamp),
-        self.windows)
+        self.windows,
+        self.pane_info)
 
   def __hash__(self):
-    return hash(self.value) + 3 * self.timestamp_micros + 7 * 
hash(self.windows)
+    return (hash(self.value) +
+            3 * self.timestamp_micros +
+            7 * hash(self.windows) +
+            11 * hash(self.pane_info))
 
   # We'd rather implement __eq__, but Cython supports that via __richcmp__
   # instead.  Fortunately __cmp__ is understood by both (but not by Python 3).
@@ -90,25 +203,29 @@ def __cmp__(left, right):  # pylint: 
disable=no-self-argument
   def _typed_eq(left, right):
     return (left.timestamp_micros == right.timestamp_micros
             and left.value == right.value
-            and left.windows == right.windows)
+            and left.windows == right.windows
+            and left.pane_info == right.pane_info)
 
   def with_value(self, new_value):
     """Creates a new WindowedValue with the same timestamps and windows as 
this.
 
     This is the fasted way to create a new WindowedValue.
     """
-    return create(new_value, self.timestamp_micros, self.windows)
+    return create(new_value, self.timestamp_micros, self.windows,
+                  self.pane_info)
 
   def __reduce__(self):
-    return WindowedValue, (self.value, self.timestamp, self.windows)
+    return WindowedValue, (self.value, self.timestamp, self.windows,
+                           self.pane_info)
 
 
 # TODO(robertwb): Move this to a static method.
-def create(value, timestamp_micros, windows):
+def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   wv = WindowedValue.__new__(WindowedValue)
   wv.value = value
   wv.timestamp_micros = timestamp_micros
   wv.windows = windows
+  wv.pane_info = pane_info
   return wv
 
 
diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py 
b/sdks/python/apache_beam/utils/windowed_value_test.py
index 7883d1dfe85..8c72c8cff9a 100644
--- a/sdks/python/apache_beam/utils/windowed_value_test.py
+++ b/sdks/python/apache_beam/utils/windowed_value_test.py
@@ -35,8 +35,11 @@ def test_timestamps(self):
                      Timestamp.of(-2.5))
 
   def test_with_value(self):
-    wv = windowed_value.WindowedValue(1, 3, ())
-    self.assertEqual(wv.with_value(10), windowed_value.WindowedValue(10, 3, 
()))
+    pane_info = windowed_value.PaneInfo(
+        True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
+    wv = windowed_value.WindowedValue(1, 3, (), pane_info)
+    self.assertEqual(wv.with_value(10),
+                     windowed_value.WindowedValue(10, 3, (), pane_info))
 
   def test_equality(self):
     self.assertEqual(
@@ -63,7 +66,9 @@ def test_hash(self):
     self.assertEqual({wv: 100}.get(wv_copy), 100)
 
   def test_pickle(self):
-    wv = windowed_value.WindowedValue(1, 3, ())
+    pane_info = windowed_value.PaneInfo(
+        True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
+    wv = windowed_value.WindowedValue(1, 3, (), pane_info)
     self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv)
 
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82038)
    Time Spent: 3h 40m  (was: 3.5h)

> Add support for PaneInfo descriptor in Python SDK
> -------------------------------------------------
>
>                 Key: BEAM-3759
>                 URL: https://issues.apache.org/jira/browse/BEAM-3759
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.3.0
>            Reporter: Charles Chen
>            Assignee: Charles Chen
>            Priority: Major
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> The PaneInfo descriptor allows a user to determine which particular 
> triggering emitted a value.  This allows the user to differentiate between 
> speculative (early), on-time (at end of window) and late value emissions 
> coming out of a GroupByKey.  We should add support for this feature in the 
> Python SDK.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to