[
https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leiyi Zhang updated BEAM-10617:
-------------------------------
Description:
here is some code I made to reproduce the issue, just run it with and without
{{*.with_fanout*}}
{code:python}
class ListFn(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, mutable_accumulator, element):
return mutable_accumulator + [element]
def merge_accumulators(self, accumulators):
res = []
for accu in accumulators:
res = res + accu
return res
def extract_output(self, accumulator):
return accumulator
p = beam.Pipeline()
(
p
| beam.Create([
window.TimestampedValue(1, Timestamp(seconds=1596216396)),
window.TimestampedValue(2, Timestamp(seconds=1596216397)),
window.TimestampedValue(3, Timestamp(seconds=1596216398)),
window.TimestampedValue(4, Timestamp(seconds=1596216399)),
window.TimestampedValue(5, Timestamp(seconds=1596216400)),
window.TimestampedValue(6, Timestamp(seconds=1596216402)),
window.TimestampedValue(7, Timestamp(seconds=1596216403)),
window.TimestampedValue(8, Timestamp(seconds=1596216405))])
| beam.WindowInto(window.SlidingWindows(10, 5))
| beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
| beam.Map(repr)
| beam.io.WriteToText("py-test-result", file_name_suffix='.json',
num_shards=1))
p.run()
{code}
was:
here is some code I made to reproduce the issue, just run it with and without
{{.with_fanout}}
> python CombineGlobally().with_fanout() cause duplicate combine results for
> sliding windows
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-10617
> URL: https://issues.apache.org/jira/browse/BEAM-10617
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-py-core
> Reporter: Leiyi Zhang
> Priority: P2
>
> here is some code I made to reproduce the issue, just run it with and without
> {{*.with_fanout*}}
>
> {code:python}
> class ListFn(beam.CombineFn):
> def create_accumulator(self):
> return []
> def add_input(self, mutable_accumulator, element):
> return mutable_accumulator + [element]
> def merge_accumulators(self, accumulators):
> res = []
> for accu in accumulators:
> res = res + accu
> return res
> def extract_output(self, accumulator):
> return accumulator
> p = beam.Pipeline()
> (
> p
> | beam.Create([
> window.TimestampedValue(1, Timestamp(seconds=1596216396)),
> window.TimestampedValue(2, Timestamp(seconds=1596216397)),
> window.TimestampedValue(3, Timestamp(seconds=1596216398)),
> window.TimestampedValue(4, Timestamp(seconds=1596216399)),
> window.TimestampedValue(5, Timestamp(seconds=1596216400)),
> window.TimestampedValue(6, Timestamp(seconds=1596216402)),
> window.TimestampedValue(7, Timestamp(seconds=1596216403)),
> window.TimestampedValue(8, Timestamp(seconds=1596216405))])
> | beam.WindowInto(window.SlidingWindows(10, 5))
> | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
> | beam.Map(repr)
> | beam.io.WriteToText("py-test-result", file_name_suffix='.json',
> num_shards=1))
> p.run()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)