[
https://issues.apache.org/jira/browse/BEAM-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548918#comment-17548918
]
Danny McCormick commented on BEAM-10617:
----------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20528
> python CombineGlobally().with_fanout() cause duplicate combine results for
> sliding windows
> ------------------------------------------------------------------------------------------
>
> Key: BEAM-10617
> URL: https://issues.apache.org/jira/browse/BEAM-10617
> Project: Beam
> Issue Type: Bug
> Components: runner-dataflow, runner-direct, sdk-py-core
> Reporter: Leiyi Zhang
> Priority: P1
>
> not only there are more than 1 result per window, results for each window got
> duplicated as well.
> here is some code I made to reproduce the issue, just run it with and without
> {{*.with_fanout*}}
> if running with Dataflow runner, add appropriate {{*gs://path/*}} in
> {{*WriteToText*}}
>
> {code:python}
> import apache_beam as beam
> from apache_beam.transforms import window
> from apache_beam.utils.timestamp import Timestamp
> class ListFn(beam.CombineFn):
> def create_accumulator(self):
> return []
> def add_input(self, mutable_accumulator, element):
> return mutable_accumulator + [element]
> def merge_accumulators(self, accumulators):
> res = []
> for accu in accumulators:
> res = res + accu
> return res
> def extract_output(self, accumulator):
> return accumulator
> p = beam.Pipeline()
> (
> p
> | beam.Create([
> window.TimestampedValue(1, Timestamp(seconds=1596216396)),
> window.TimestampedValue(2, Timestamp(seconds=1596216397)),
> window.TimestampedValue(3, Timestamp(seconds=1596216398)),
> window.TimestampedValue(4, Timestamp(seconds=1596216399)),
> window.TimestampedValue(5, Timestamp(seconds=1596216400)),
> window.TimestampedValue(6, Timestamp(seconds=1596216402)),
> window.TimestampedValue(7, Timestamp(seconds=1596216403)),
> window.TimestampedValue(8, Timestamp(seconds=1596216405))])
> | beam.WindowInto(window.SlidingWindows(10, 5))
> | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
> | beam.Map(repr)
> | beam.io.WriteToText("py-test-result", file_name_suffix='.json',
> num_shards=1))
> p.run()
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)