[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing
[ https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Beam JIRA Bot updated BEAM-6177: Priority: P3 (was: P2) > AfterProcessingTime not firing > -- > > Key: BEAM-6177 > URL: https://issues.apache.org/jira/browse/BEAM-6177 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.8.0 >Reporter: Arnaud T >Priority: P3 > Labels: stale-P2 > > Hi, > > Documentation says that a AfterProcessingTime(X) trigger should fire X > seconds after the first element is processed, but it appears that this > trigger never fires when using a Global window on a steady influx of elements. > Here is my pipeline: > > {code:java} > (p > | 'pubsub' >> > beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes) > | 'window' >> beam.WindowInto( > window.GlobalWindows(), > trigger=Repeatedly(AfterProcessingTime(5)), > accumulation_mode=AccumulationMode.DISCARDING > ) > | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow) > | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data)) > | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults() > | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, > self._clickhouse_port,self._clickhouse_database) > ) > {code} > > > I expect that every 5 seconds (as long as elements are pouring in), the > trigger would fire and my data would be combined. The idea of this pipeline > is simply to get messages from PubSub, transform them into ClickHouse ORM > models and then batch save them into ClickHouse, using as much parallelism as > possible - we do not care about order, etc... Elements can be inserted in any > order and are not correlated to one another. > The potential issue is in _class > AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in > trigger.py_: > > {code:java} > context.set_timer( > '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay) > {code} > This will basically override the previously set timer every time a new > element comes in, and in the case of a constant influx of elements, the > trigger only fires once we have no more elements for X seconds. > > > Please let me know if I understood the documentation right, and if I can > further help. > > Thanks you, > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing
[ https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Beam JIRA Bot updated BEAM-6177: Labels: (was: stale-P2) > AfterProcessingTime not firing > -- > > Key: BEAM-6177 > URL: https://issues.apache.org/jira/browse/BEAM-6177 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.8.0 >Reporter: Arnaud T >Priority: P3 > > Hi, > > Documentation says that a AfterProcessingTime(X) trigger should fire X > seconds after the first element is processed, but it appears that this > trigger never fires when using a Global window on a steady influx of elements. > Here is my pipeline: > > {code:java} > (p > | 'pubsub' >> > beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes) > | 'window' >> beam.WindowInto( > window.GlobalWindows(), > trigger=Repeatedly(AfterProcessingTime(5)), > accumulation_mode=AccumulationMode.DISCARDING > ) > | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow) > | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data)) > | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults() > | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, > self._clickhouse_port,self._clickhouse_database) > ) > {code} > > > I expect that every 5 seconds (as long as elements are pouring in), the > trigger would fire and my data would be combined. The idea of this pipeline > is simply to get messages from PubSub, transform them into ClickHouse ORM > models and then batch save them into ClickHouse, using as much parallelism as > possible - we do not care about order, etc... Elements can be inserted in any > order and are not correlated to one another. > The potential issue is in _class > AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in > trigger.py_: > > {code:java} > context.set_timer( > '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay) > {code} > This will basically override the previously set timer every time a new > element comes in, and in the case of a constant influx of elements, the > trigger only fires once we have no more elements for X seconds. > > > Please let me know if I understood the documentation right, and if I can > further help. > > Thanks you, > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing
[ https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Beam JIRA Bot updated BEAM-6177: Labels: stale-P2 (was: ) > AfterProcessingTime not firing > -- > > Key: BEAM-6177 > URL: https://issues.apache.org/jira/browse/BEAM-6177 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Affects Versions: 2.8.0 >Reporter: Arnaud T >Priority: P2 > Labels: stale-P2 > > Hi, > > Documentation says that a AfterProcessingTime(X) trigger should fire X > seconds after the first element is processed, but it appears that this > trigger never fires when using a Global window on a steady influx of elements. > Here is my pipeline: > > {code:java} > (p > | 'pubsub' >> > beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes) > | 'window' >> beam.WindowInto( > window.GlobalWindows(), > trigger=Repeatedly(AfterProcessingTime(5)), > accumulation_mode=AccumulationMode.DISCARDING > ) > | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow) > | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data)) > | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults() > | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, > self._clickhouse_port,self._clickhouse_database) > ) > {code} > > > I expect that every 5 seconds (as long as elements are pouring in), the > trigger would fire and my data would be combined. The idea of this pipeline > is simply to get messages from PubSub, transform them into ClickHouse ORM > models and then batch save them into ClickHouse, using as much parallelism as > possible - we do not care about order, etc... Elements can be inserted in any > order and are not correlated to one another. > The potential issue is in _class > AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in > trigger.py_: > > {code:java} > context.set_timer( > '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay) > {code} > This will basically override the previously set timer every time a new > element comes in, and in the case of a constant influx of elements, the > trigger only fires once we have no more elements for X seconds. > > > Please let me know if I understood the documentation right, and if I can > further help. > > Thanks you, > -- This message was sent by Atlassian Jira (v8.3.4#803005)