[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354829=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354829
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 06/Dec/19 00:39
Start Date: 06/Dec/19 00:39
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354615011
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
 
 Review comment:
   updated.  
   
   With this change, I believe the motivation is to just make the element large 
when serialization happens, thus the actual element no longer need to hold a 
blob of data.  Though I don't full get why the latter is necessarily better for 
the sake of having the test (?). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354829)
Time Spent: 10h 20m  (was: 10h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 10h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354825=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354825
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 06/Dec/19 00:32
Start Date: 06/Dec/19 00:32
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354616584
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
+def process(self, elements):
+  unused_key, ts = elements
+
+  yield sum([item.num_elems for item in ts])
+
+  def create_pipeline(self):
+return beam.Pipeline(
 
 Review comment:
   acked.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354825)
Time Spent: 10h 10m  (was: 10h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 10h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354823=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354823
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 06/Dec/19 00:32
Start Date: 06/Dec/19 00:32
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354615011
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
 
 Review comment:
   updated.  
   
   With this change, I believe  the motivation is to just make the element 
large when serialization happens, thus the actual element now does not hold a 
blob of data.  Thus I don't full get why it is necessarily better for the sake 
of having the test. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354823)
Time Spent: 10h  (was: 9h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 10h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354816=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354816
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 06/Dec/19 00:25
Start Date: 06/Dec/19 00:25
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354615011
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
 
 Review comment:
   updated.  I believe the motivation is to just make the element large when 
serialization happens, thus the actual element now does not hold a blob of 
data. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354816)
Time Spent: 9h 50m  (was: 9h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354815=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354815
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 06/Dec/19 00:24
Start Date: 06/Dec/19 00:24
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354614686
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
+def process(self, elements):
+  unused_key, ts = elements
 
 Review comment:
   Ah. This is much better.  Thanks! 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354815)
Time Spent: 9h 40m  (was: 9.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354789=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354789
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:40
Start Date: 05/Dec/19 23:40
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354603779
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
 
 Review comment:
   fixed.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354789)
Time Spent: 9.5h  (was: 9h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354774=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354774
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354595421
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
 
 Review comment:
   Just to be clear, `live_element_count`?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354774)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354771=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354771
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354594892
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
 
 Review comment:
   This is never used, you can delete it. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354771)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354772=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354772
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354596030
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
+def process(self, elements):
+  unused_key, ts = elements
 
 Review comment:
   You can use MapTuple to avoid unpacking. Then it becomes a one-liner:
   
   `MapTuple(lambda _, vs: sum(e.num_elems for e in vs)`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354772)
Time Spent: 9h 20m  (was: 9h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354775=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354775
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354597042
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
+def process(self, elements):
+  unused_key, ts = elements
+
+  yield sum([item.num_elems for item in ts])
+
+  def create_pipeline(self):
+return beam.Pipeline(
 
 Review comment:
   We'd need TestPipeline to be a ValidatesRunner test. Of course in that case 
we couldn't manually pass use_state_iterables (unless we make it a pipeline 
option).
   
   However, I'm +1 for this test going in and then future work getting it to 
run on other runners (possibly just overriding the create_pipeline method 
altogether). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354775)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354770=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354770
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354595331
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
 
 Review comment:
   To make the serialized form really big, you could return 
`(self.num_elements, 'x' * self.num_elements)`.
   
   Might make sense to call num_elements size instead. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354770)
Time Spent: 9h 10m  (was: 9h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354769=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354769
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354596302
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
+def process(self, elements):
+  unused_key, ts = elements
+
+  yield sum([item.num_elems for item in ts])
+
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_gbk_many_values(self):
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
 
 Review comment:
   At least make them constants (and use their product below), or perhaps even 
arguments (with defaults) to this test (which make it easy to parameterize 
externally). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354769)
Time Spent: 9h 10m  (was: 9h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=354773=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-354773
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 05/Dec/19 23:18
Start Date: 05/Dec/19 23:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r354595740
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,49 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+class StateBackedTestElementType(object):
+  element_count = 0
+
+  def __init__(self, num_elems):
+self.num_elems = num_elems
+self.value = ['a' for _ in range(num_elems)]
+StateBackedTestElementType.element_count += 1
+# Due to using state backed iterable, we expect there is a few instances
+# alive at any given time.
+if StateBackedTestElementType.element_count > 5:
+  raise RuntimeError('Too many live instances.')
+
+  def __del__(self):
+StateBackedTestElementType.element_count -= 1
+
+  def __reduce__(self):
+return (self.__class__, (self.num_elems, ))
+
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+
+  class ElementDoFn(beam.DoFn):
 
 Review comment:
   As before, if there's nothing but a process method, use `Map`. 
   
   Also, it'd be good to give it a meaningful name, like sum_sizes or similar. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 354773)
Time Spent: 9h 20m  (was: 9h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=353712=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353712
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 04/Dec/19 19:39
Start Date: 04/Dec/19 19:39
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10143: [BEAM-8645] To test 
state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#issuecomment-561806486
 
 
   Updated. Please take another look. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 353712)
Time Spent: 9h  (was: 8h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-04 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=353709=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-353709
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 04/Dec/19 19:38
Start Date: 04/Dec/19 19:38
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r353942935
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   Thanks. Updated based on the suggestions above.  
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 353709)
Time Spent: 8h 50m  (was: 8h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=352265=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352265
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 02/Dec/19 22:17
Start Date: 02/Dec/19 22:17
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352265)
Time Spent: 8h 40m  (was: 8.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-12-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=352213=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-352213
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 02/Dec/19 21:16
Start Date: 02/Dec/19 21:16
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-560584323
 
 
   @robertwb  ping for merging this PR
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 352213)
Time Spent: 8.5h  (was: 8h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-24 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=348703=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-348703
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 24/Nov/19 12:44
Start Date: 24/Nov/19 12:44
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-557884974
 
 
   Thanks for applying the unskip test. 
   
   All test now passed. This PR is ready to be merged.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 348703)
Time Spent: 8h 20m  (was: 8h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-23 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=348521=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-348521
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 23/Nov/19 14:26
Start Date: 23/Nov/19 14:26
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-557802123
 
 
   Run Portable_Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 348521)
Time Spent: 8h 10m  (was: 8h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347805=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347805
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 23:54
Start Date: 21/Nov/19 23:54
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r349376331
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   So what you'd want to do is create some class with custom pickling 
(implement `__reduce__`). This picking would be artificially large, e.g. 
include a dummy `'a' * 1000` value, as many runners trigger on serialized size 
not element count.
   
   In the constructor, you would increment a class-level variable to indicate 
how many are alive. In the destructor (`__del__`), you would decrement it. An 
error would be thrown if there are too many. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347805)
Time Spent: 8h  (was: 7h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 8h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347761=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347761
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 22:42
Start Date: 21/Nov/19 22:42
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349355406
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,74 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| TestStream()
+.add_elements([window.TimestampedValue(('k', 100), 2)])
+.add_elements([window.TimestampedValue(('k', 400), 7)])
+.advance_watermark_to_infinity()
+| beam.WindowInto(
+window.FixedWindows(10),
+timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+| beam.CombinePerKey(sum))
+
+  records = (result
+ | beam.Map(lambda e, ts=beam.DoFn.TimestampParam: (e, ts)))
+
+  # All the KV pairs are applied GBK using EARLIEST timestamp for the same
+  # key.
+  expected_window_to_elements = {
+  window.IntervalWindow(0, 10): [
+  (('k', 500), Timestamp(2)),
+  ],
+  }
+
+  assert_that(
+  records,
+  equal_to_per_window(expected_window_to_elements),
+  use_global_window=False,
+  label='assert per window')
+
+  def test_combiner_latest(self):
+"""Test TimestampCombiner with LATEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| TestStream()
+.add_elements([window.TimestampedValue(('k', 100), 2)])
+.add_elements([window.TimestampedValue(('k', 400), 7)])
 
 Review comment:
   Turns out this happens to work because 7 is the last element.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347761)
Time Spent: 7h 50m  (was: 7h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347693=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347693
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 20:56
Start Date: 21/Nov/19 20:56
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-557268466
 
 
   Updated. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347693)
Time Spent: 7h 40m  (was: 7.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347629=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347629
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 19:18
Start Date: 21/Nov/19 19:18
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349271092
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,76 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| 'main TestStream' >> TestStream()
 
 Review comment:
   Thanks!  Will double check the style on all the other pending PRs as well. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347629)
Time Spent: 7.5h  (was: 7h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347623=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347623
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 19:14
Start Date: 21/Nov/19 19:14
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349269299
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,76 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| 'main TestStream' >> TestStream()
+.add_elements([window.TimestampedValue(('k', 100), 2)])
+.add_elements([window.TimestampedValue(('k', 400), 7)])
+.advance_watermark_to_infinity()
+| 'main windowInto' >> beam.WindowInto(
+window.FixedWindows(10),
+timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+| 'Combine' >> beam.CombinePerKey(sum))
+
+  records = (result | beam.ParDo(self.RecordFn()))
 
 Review comment:
   Ah. Thanks for the tips. :-D 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347623)
Time Spent: 7h 20m  (was: 7h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347582=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347582
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 18:22
Start Date: 21/Nov/19 18:22
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349244418
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,76 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| 'main TestStream' >> TestStream()
+.add_elements([window.TimestampedValue(('k', 100), 2)])
+.add_elements([window.TimestampedValue(('k', 400), 7)])
+.advance_watermark_to_infinity()
+| 'main windowInto' >> beam.WindowInto(
+window.FixedWindows(10),
+timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+| 'Combine' >> beam.CombinePerKey(sum))
+
+  records = (result | beam.ParDo(self.RecordFn()))
 
 Review comment:
   As before, a full DoFn is not needed here, just write `beam.Map(lambda e, 
ts=beam.DoFn.TimestampParam: (e, ts))`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347582)
Time Spent: 6h 50m  (was: 6h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347584
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 18:22
Start Date: 21/Nov/19 18:22
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349244835
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,76 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| 'main TestStream' >> TestStream()
 
 Review comment:
   Omit the `label >>` unless they are needed for uniqueness or clarity. These 
labels don't add anything to what the default labels provide. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347584)
Time Spent: 7h 10m  (was: 7h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=347583=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-347583
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 21/Nov/19 18:22
Start Date: 21/Nov/19 18:22
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r349244498
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +486,76 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions(streaming=True)
+with TestPipeline(options=options) as p:
+  result = (p
+| 'main TestStream' >> TestStream()
+.add_elements([window.TimestampedValue(('k', 100), 2)])
+.add_elements([window.TimestampedValue(('k', 400), 7)])
+.advance_watermark_to_infinity()
+| 'main windowInto' >> beam.WindowInto(
+window.FixedWindows(10),
+timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+| 'Combine' >> beam.CombinePerKey(sum))
+
+  records = (result | beam.ParDo(self.RecordFn()))
 
 Review comment:
   Also, no need for the surrounding ()'s.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 347583)
Time Spent: 7h  (was: 6h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346492=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346492
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 06:33
Start Date: 20/Nov/19 06:33
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348309394
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   Maybe we can have both. 
   
   Would you please explain how to control some values 'alive' while others in 
the same iterable not? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346492)
Time Spent: 6h 40m  (was: 6.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346490=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346490
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 06:28
Start Date: 20/Nov/19 06:28
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348309394
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   Maybe we can keep both. 
   
   Would you please explain how to control some values 'alive' while others in 
the same iterable not? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346490)
Time Spent: 6.5h  (was: 6h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346489=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346489
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 06:28
Start Date: 20/Nov/19 06:28
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348309394
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   Maybe let us keep both. 
   
   Would you please explain how to control some values 'alive' while others in 
the same iterable not? 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346489)
Time Spent: 6h 20m  (was: 6h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346399=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346399
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348250056
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
+
+assert_that(main, equal_to(['a', 2]))
+p.run()
 
 Review comment:
   No p.run needed when using the `with` statement. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346399)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346402=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346402
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348249550
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
 
 Review comment:
   I would call this something like test_gbk_many_values or similar. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346402)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346401=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346401
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348250260
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   Actually, a better test would be to ensure no more than N (for some value of 
N < number of elements) instances of the value type are alive at any given 
moment. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346401)
Time Spent: 6h 10m  (was: 6h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346400=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346400
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348249978
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
 
 Review comment:
   Rather than create all the values in memory, I'd create these with a DoFn. 
E.g.
   
   beam.Create([None]) | beam.FlatMap(lambda x: ((x, 1) for _ in range(2))) 
| ...
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346400)
Time Spent: 6h  (was: 5h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346398=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346398
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348249763
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
 
 Review comment:
   As before, no need to name the GBK (or others). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346398)
Time Spent: 5h 50m  (was: 5h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346397=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346397
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 20/Nov/19 01:18
Start Date: 20/Nov/19 01:18
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#discussion_r348249348
 
 

 ##
 File path: sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
 ##
 @@ -1579,6 +1580,27 @@ def test_lull_logging(self):
 '.*There has been a processing lull of over.*',
 'Unable to find a lull logged for this job.')
 
+@attr('ValidatesRunner')
+class FnApiBasedStateBackedCoderTest(unittest.TestCase):
+  def create_pipeline(self):
+return beam.Pipeline(
+runner=fn_api_runner.FnApiRunner(use_state_iterables=True))
+
+  def test_state_backed_coder(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+return (gbk_result[0], sum(value_list))
+
+with self.create_pipeline() as p:
+  # The number of integers could be a knob to test against
+  # different runners' default settings on page size.
+  main = (p | 'main' >> beam.Create([('a', 1) for _ in range(0, 2)])
+  | 'GBK' >> beam.GroupByKey()
+  | 'Sum' >> beam.ParDo(MyDoFn()))
 
 Review comment:
   This could just be beam.MapTuple(lambda key, values: sum(values))
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346397)
Time Spent: 5h 40m  (was: 5.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346313=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346313
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 22:31
Start Date: 19/Nov/19 22:31
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-555747139
 
 
   would you please point out which PR in the master *Might* resolve the issue? 
 I can follow and trace that part a bit using this test case. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346313)
Time Spent: 5.5h  (was: 5h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346305=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346305
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 22:26
Start Date: 19/Nov/19 22:26
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-555745530
 
 
   Pulled from master, and retried.  The EARLIEST case still failed, with 
error: 
   
   "apache_beam.runners.direct.executor: WARNING: A task failed with exception: 
Failed assert: [(('k', 500), Timestamp(7))] not in [(('k', 500), Timestamp(2))] 
[while running 'assert per window/Match']"
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346305)
Time Spent: 5h 20m  (was: 5h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346221=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346221
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:41
Start Date: 19/Nov/19 19:41
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346221)
Time Spent: 5h 10m  (was: 5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346219=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346219
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:40
Start Date: 19/Nov/19 19:40
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348129108
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
 
 Review comment:
   Updated to be 2 and 7 respectively.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346219)
Time Spent: 4h 50m  (was: 4h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346217=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346217
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:40
Start Date: 19/Nov/19 19:40
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348128997
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
+   .add_elements([window.TimestampedValue(('k', 400), 9)])
+   .advance_watermark_to_infinity()
+   | 'main windowInto' >> beam.WindowInto(
+   window.FixedWindows(10),
+   timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+   | 'Combine' >> beam.CombinePerKey(sum))
+
+records = (main_stream | beam.ParDo(self.RecordFn()))
+
+# All the KV pairs are applied GBK using EARLIEST timestamp.
+expected_window_to_elements = {
+window.IntervalWindow(0, 10): [
+(('k', 500), Timestamp(0)),
+],
+}
+
+assert_that(
+records,
+equal_to_per_window(expected_window_to_elements),
+use_global_window=False,
+label='assert per window')
+
+p.run()
+
+  def test_combiner_latest(self):
+"""Test TimestampCombiner with LATEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
+   .add_elements([window.TimestampedValue(('k', 400), 9)])
+   .advance_watermark_to_infinity()
+   | 'main windowInto' >> beam.WindowInto(
+   window.FixedWindows(10),
+   timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
 
 Review comment:
   oops.  result from OUTPUT_AT_EARLIEST is indeed wrong, which leads to the 
false positive here. 
   
   Fixed. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346217)
Time Spent: 4h 40m  (was: 4.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> 

[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346220=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346220
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:40
Start Date: 19/Nov/19 19:40
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#discussion_r348129164
 
 

 ##
 File path: sdks/python/apache_beam/transforms/ptransform_test.py
 ##
 @@ -471,6 +471,24 @@ def test_group_by_key(self):
 assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 pipeline.run()
 
+  def test_group_by_key_reiteration(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+sum_val = 0
+# Iterate the GBK result for multiple times.
+for _ in range(0, 17):
+  sum_val += sum(value_list)
+return (gbk_result[0], sum_val)
+
+pipeline = TestPipeline()
+pcoll = pipeline | 'start' >> beam.Create(
+[(1, 1), (1, 2), (1, 3), (1, 4)])
+result = (pcoll | 'Group' >> beam.GroupByKey()
+  | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
+assert_that(result, equal_to([1, 170]))
 
 Review comment:
   FYI, if your DoFn only has a process method, it's generally easier (and 
clearer) to use `beam.Map` or `beam.FlatMap` instead. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346220)
Time Spent: 5h  (was: 4h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346188=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346188
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:13
Start Date: 19/Nov/19 19:13
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348115055
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
 
 Review comment:
   Here you can just write 
   
   `options = PipelineOptions(streaming=True)`
   
   Similarly below.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346188)
Time Spent: 4h  (was: 3h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346189=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346189
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:13
Start Date: 19/Nov/19 19:13
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348115559
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
 
 Review comment:
   Prefer 
   
   ```
   with TestPipeline(options=options):
   [pipeline construction
   ```
   
   over using `p.run()`.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346189)
Time Spent: 4h 10m  (was: 4h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346191=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346191
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:13
Start Date: 19/Nov/19 19:13
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348115791
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
 
 Review comment:
   I'd probably be a stronger test if the timestamps weren't the endpoints, 
e.g. 2 and 7. (Similarly below.)
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346191)
Time Spent: 4.5h  (was: 4h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=346190=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-346190
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 19:13
Start Date: 19/Nov/19 19:13
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10081: [BEAM-8645] 
A test case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#discussion_r348114372
 
 

 ##
 File path: sdks/python/apache_beam/transforms/combiners_test.py
 ##
 @@ -480,6 +487,82 @@ def test_with_input_types_decorator_violation(self):
 pc = p | Create(l_3_tuple)
 _ = pc | beam.CombineGlobally(self.fn)
 
+#
+# Test cases for streaming.
+#
+@attr('ValidatesRunner')
+class TimestampCombinerTest(unittest.TestCase):
+
+  class RecordFn(beam.DoFn):
+def process(self, elm=beam.DoFn.ElementParam, ts=beam.DoFn.TimestampParam):
+  yield (elm, ts)
+
+  @unittest.skip('BEAM-8657')
+  def test_combiner_earliest(self):
+"""Test TimestampCombiner with EARLIEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
+   .add_elements([window.TimestampedValue(('k', 400), 9)])
+   .advance_watermark_to_infinity()
+   | 'main windowInto' >> beam.WindowInto(
+   window.FixedWindows(10),
+   timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
+   | 'Combine' >> beam.CombinePerKey(sum))
+
+records = (main_stream | beam.ParDo(self.RecordFn()))
+
+# All the KV pairs are applied GBK using EARLIEST timestamp.
+expected_window_to_elements = {
+window.IntervalWindow(0, 10): [
+(('k', 500), Timestamp(0)),
+],
+}
+
+assert_that(
+records,
+equal_to_per_window(expected_window_to_elements),
+use_global_window=False,
+label='assert per window')
+
+p.run()
+
+  def test_combiner_latest(self):
+"""Test TimestampCombiner with LATEST."""
+options = PipelineOptions()
+options.view_as(StandardOptions).streaming = True
+p = TestPipeline(options=options)
+
+main_stream = (p
+   | 'main TestStream' >> TestStream()
+   .add_elements([window.TimestampedValue(('k', 100), 0)])
+   .add_elements([window.TimestampedValue(('k', 400), 9)])
+   .advance_watermark_to_infinity()
+   | 'main windowInto' >> beam.WindowInto(
+   window.FixedWindows(10),
+   timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
 
 Review comment:
   Shouldn't this be LATEST?
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 346190)
Time Spent: 4h 20m  (was: 4h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E

[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-19 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345844=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345844
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 19/Nov/19 08:03
Start Date: 19/Nov/19 08:03
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10143: [BEAM-8645] To test 
state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#issuecomment-555381786
 
 
   R: @robertwb 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345844)
Time Spent: 3h 50m  (was: 3h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345665=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345665
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 23:24
Start Date: 18/Nov/19 23:24
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10143: [BEAM-8645] To test 
state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143#issuecomment-555257322
 
 
   Run Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345665)
Time Spent: 3h 40m  (was: 3.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345640=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345640
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 22:44
Start Date: 18/Nov/19 22:44
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-555245258
 
 
   Thanks for reviewing. 
   
   Updated. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345640)
Time Spent: 3.5h  (was: 3h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345638=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345638
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 22:40
Start Date: 18/Nov/19 22:40
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#discussion_r347647686
 
 

 ##
 File path: sdks/python/apache_beam/transforms/ptransform_test.py
 ##
 @@ -471,6 +471,24 @@ def test_group_by_key(self):
 assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 pipeline.run()
 
+  def test_group_by_key_reiteration(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+sum_val = 0
+# Iterate the GBK result for multiple times.
+for _ in range(0, 17):
+  sum_val += sum(value_list)
+return (gbk_result[0], sum_val)
+
+pipeline = TestPipeline()
+pcoll = pipeline | 'start' >> beam.Create(
+[(1, 1), (1, 2), (1, 3), (1, 4)])
+result = (pcoll | 'Group' >> beam.GroupByKey()
+  | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
+assert_that(result, equal_to([1, 170]))
 
 Review comment:
   Ah, turns out the DoFn above is supposed to return a list in py SDK.
   
   Thanks for catching this.  Fixed.  
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345638)
Time Spent: 3h 20m  (was: 3h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345637=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345637
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 22:39
Start Date: 18/Nov/19 22:39
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#discussion_r347647417
 
 

 ##
 File path: sdks/python/apache_beam/transforms/ptransform_test.py
 ##
 @@ -471,6 +471,24 @@ def test_group_by_key(self):
 assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 pipeline.run()
 
+  def test_group_by_key_reiteration(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
 
 Review comment:
   Thanks! Fixed. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345637)
Time Spent: 3h 10m  (was: 3h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345610=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345610
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 22:08
Start Date: 18/Nov/19 22:08
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#discussion_r347634526
 
 

 ##
 File path: sdks/python/apache_beam/transforms/ptransform_test.py
 ##
 @@ -471,6 +471,24 @@ def test_group_by_key(self):
 assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 pipeline.run()
 
+  def test_group_by_key_reiteration(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
 
 Review comment:
   More idiomatic to do `key, value_list = gbk_result`
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345610)
Time Spent: 2h 50m  (was: 2h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345611=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345611
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 22:08
Start Date: 18/Nov/19 22:08
Worklog Time Spent: 10m 
  Work Description: robertwb commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#discussion_r347635063
 
 

 ##
 File path: sdks/python/apache_beam/transforms/ptransform_test.py
 ##
 @@ -471,6 +471,24 @@ def test_group_by_key(self):
 assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 pipeline.run()
 
+  def test_group_by_key_reiteration(self):
+class MyDoFn(beam.DoFn):
+  def process(self, gbk_result):
+value_list = gbk_result[1]
+sum_val = 0
+# Iterate the GBK result for multiple times.
+for _ in range(0, 17):
+  sum_val += sum(value_list)
+return (gbk_result[0], sum_val)
+
+pipeline = TestPipeline()
+pcoll = pipeline | 'start' >> beam.Create(
+[(1, 1), (1, 2), (1, 3), (1, 4)])
+result = (pcoll | 'Group' >> beam.GroupByKey()
+  | 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
+assert_that(result, equal_to([1, 170]))
 
 Review comment:
   `equal_to` takes a list of PCollection elements. This should be 
`equal_to([(1, 170)])`. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345611)
Time Spent: 3h  (was: 2h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345587=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345587
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 21:12
Start Date: 18/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-554583097
 
 
   R: @robertwb , @pabloem 
   
   Thanks. 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345587)
Time Spent: 2h 40m  (was: 2.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345586=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345586
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 21:12
Start Date: 18/Nov/19 21:12
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-555209852
 
 
   CC: @pabloem 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345586)
Time Spent: 2.5h  (was: 2h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345584=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345584
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 21:09
Start Date: 18/Nov/19 21:09
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-555209852
 
 
   CC: @pabloem 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345584)
Time Spent: 2h 20m  (was: 2h 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345539
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 19:32
Start Date: 18/Nov/19 19:32
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10143: [BEAM-8645] 
To test state backed iterable coder in py sdk.
URL: https://github.com/apache/beam/pull/10143
 
 
   To test state backed iterable coder in py sdk. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ X] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=345520=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-345520
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 18/Nov/19 18:57
Start Date: 18/Nov/19 18:57
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-554583097
 
 
   R: @robertwb   
   
   Thanks. 
   
   Will look for committer if this makes sense in py SDK. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 345520)
Time Spent: 2h  (was: 1h 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 2h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344735=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344735
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 16/Nov/19 00:55
Start Date: 16/Nov/19 00:55
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-554583097
 
 
   R: @ananvay  
   
   Thanks. 
   
   Will look for committer if this makes sense in py SDK. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344735)
Time Spent: 1h 50m  (was: 1h 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344726=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344726
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 16/Nov/19 00:43
Start Date: 16/Nov/19 00:43
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-554583097
 
 
   R: @ananvay  
   
   Thanks. 
   
   Will look for committer once this looks good. 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344726)
Time Spent: 1h 40m  (was: 1.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344724=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344724
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 16/Nov/19 00:38
Start Date: 16/Nov/19 00:38
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10135: [BEAM-8645] Create 
a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135#issuecomment-554583097
 
 
   R: @ananvay 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344724)
Time Spent: 1.5h  (was: 1h 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344714=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344714
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 16/Nov/19 00:25
Start Date: 16/Nov/19 00:25
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10135: [BEAM-8645] 
Create a py test case for Re-iteration on GBK result. 
URL: https://github.com/apache/beam/pull/10135
 
 
   Create a python test case to iterate on GBK result for multiple times. 
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ X] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build
 

[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-15 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344476=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344476
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 15/Nov/19 18:44
Start Date: 15/Nov/19 18:44
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-554480944
 
 
   Hi, Robert,  
   
I synced with your fixing PR for combiner lifting, now LATEST works (I 
removed skip tag), but EARLIEST still not correct.  
   
Maybe let us merge this test case, and then you can test against your 
up-coming PR easier. WDYT? 
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344476)
Time Spent: 1h 10m  (was: 1h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=344036=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-344036
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 15/Nov/19 07:48
Start Date: 15/Nov/19 07:48
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-553685518
 
 
   R: @robertwb 
   
   PreCommit failed once  but I don't think it is caused by this PR.  (after 
all we only added two tests, and they are disabled. Tested these two new cases 
on local machine, they worked). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 344036)
Time Spent: 1h  (was: 50m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=343070=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343070
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 14/Nov/19 01:47
Start Date: 14/Nov/19 01:47
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-553686050
 
 
   Run Portable_Python PreCommit
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343070)
Time Spent: 50m  (was: 40m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=343069=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343069
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 14/Nov/19 01:47
Start Date: 14/Nov/19 01:47
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-553685518
 
 
   R: @robertwb 
   
   PreCommit failed once  but I don't think it is caused by this PR.  (after 
all we only added two tests, and they are disabled. Tested these two new cases 
on local machine, they worked). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343069)
Time Spent: 40m  (was: 0.5h)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=343068=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343068
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 14/Nov/19 01:45
Start Date: 14/Nov/19 01:45
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-553685518
 
 
   R: @robertwb 
   
   PreCommit failed once  but I don't think it is caused by my PR.  (after all 
we only added two tests, and they are disabled. Tested these two new cases on 
local machine, they worked). 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343068)
Time Spent: 0.5h  (was: 20m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-13 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=343029=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343029
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 14/Nov/19 00:27
Start Date: 14/Nov/19 00:27
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on issue #10081: [BEAM-8645] A test 
case for TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081#issuecomment-553667615
 
 
   R: @robertwb 
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 343029)
Time Spent: 20m  (was: 10m)

> TimestampCombiner incorrect in beam python
> --
>
> Key: BEAM-8645
> URL: https://issues.apache.org/jira/browse/BEAM-8645
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Ruoyun Huang
>Priority: Major
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> When we have a TimestampValue on combine: 
> {code:java}
> main_stream = (p   
> | 'main TestStream' >> TestStream()   
> .add_elements([window.TimestampedValue(('k', 100), 0)])   
> .add_elements([window.TimestampedValue(('k', 400), 9)])   
> .advance_watermark_to_infinity()   
> | 'main windowInto' >> beam.WindowInto( 
> window.FixedWindows(10),  
> timestamp_combiner=TimestampCombiner.OUTPUT_AT_LATEST)   | 
> 'Combine' >> beam.CombinePerKey(sum))
> The expect timestamp should be:
> LATEST:    (('k', 500), Timestamp(9)),
> EARLIEST:    (('k', 500), Timestamp(0)),
> END_OF_WINDOW: (('k', 500), Timestamp(10)),
> But current py streaming gives following results: 
> LATEST:    (('k', 500), Timestamp(10)),
> EARLIEST:    (('k', 500), Timestamp(10)),
> END_OF_WINDOW: (('k', 500), Timestamp(9.)),
> More details and discussions:
> https://lists.apache.org/thread.html/d3af1f2f84a2e59a747196039eae77812b78a991f0f293c717e5f4e1@%3Cdev.beam.apache.org%3E
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Work logged] (BEAM-8645) TimestampCombiner incorrect in beam python

2019-11-12 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-8645?focusedWorklogId=342317=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-342317
 ]

ASF GitHub Bot logged work on BEAM-8645:


Author: ASF GitHub Bot
Created on: 13/Nov/19 00:44
Start Date: 13/Nov/19 00:44
Worklog Time Spent: 10m 
  Work Description: HuangLED commented on pull request #10081: [BEAM-8645] 
A test case to show the expected behavior of TimestampCombiner.
URL: https://github.com/apache/beam/pull/10081
 
 
   This PR does not resolve BEAM-8645, but rather shows how to reproduce the 
[likely] bug. 
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
- [ X] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
- [ X] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
- [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   

   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build