[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing

2020-06-16 Thread Beam JIRA Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-6177:

Priority: P3  (was: P2)

> AfterProcessingTime not firing
> --
>
> Key: BEAM-6177
> URL: https://issues.apache.org/jira/browse/BEAM-6177
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.8.0
>Reporter: Arnaud T
>Priority: P3
>  Labels: stale-P2
>
> Hi,
>  
> Documentation says that a AfterProcessingTime(X) trigger should fire X 
> seconds after the first element is processed, but it appears that this 
> trigger never fires when using a Global window on a steady influx of elements.
> Here is my pipeline:
>  
> {code:java}
> (p
> | 'pubsub' >> 
> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
> | 'window' >> beam.WindowInto(
>   window.GlobalWindows(),
>   trigger=Repeatedly(AfterProcessingTime(5)),
>   accumulation_mode=AccumulationMode.DISCARDING
>   )
> | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
> | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
> | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
> | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
> self._clickhouse_port,self._clickhouse_database)
> )
> {code}
>  
>  
> I expect that every 5 seconds (as long as elements are pouring in), the 
> trigger would fire and my data would be combined. The idea of this pipeline 
> is simply to get messages from PubSub, transform them into ClickHouse ORM 
> models and then batch save them into ClickHouse, using as much parallelism as 
> possible - we do not care about order, etc... Elements can be inserted in any 
> order and are not correlated to one another.
> The potential issue is in _class 
> AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
> trigger.py_:
>  
> {code:java}
> context.set_timer(
> '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
> {code}
> This will basically override the previously set timer every time a new 
> element comes in, and in the case of a constant influx of elements, the 
> trigger only fires once we have no more elements for X seconds.
>  
>  
> Please let me know if I understood the documentation right, and if I can 
> further help.
>  
> Thanks you,
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing

2020-06-16 Thread Beam JIRA Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-6177:

Labels:   (was: stale-P2)

> AfterProcessingTime not firing
> --
>
> Key: BEAM-6177
> URL: https://issues.apache.org/jira/browse/BEAM-6177
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.8.0
>Reporter: Arnaud T
>Priority: P3
>
> Hi,
>  
> Documentation says that a AfterProcessingTime(X) trigger should fire X 
> seconds after the first element is processed, but it appears that this 
> trigger never fires when using a Global window on a steady influx of elements.
> Here is my pipeline:
>  
> {code:java}
> (p
> | 'pubsub' >> 
> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
> | 'window' >> beam.WindowInto(
>   window.GlobalWindows(),
>   trigger=Repeatedly(AfterProcessingTime(5)),
>   accumulation_mode=AccumulationMode.DISCARDING
>   )
> | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
> | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
> | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
> | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
> self._clickhouse_port,self._clickhouse_database)
> )
> {code}
>  
>  
> I expect that every 5 seconds (as long as elements are pouring in), the 
> trigger would fire and my data would be combined. The idea of this pipeline 
> is simply to get messages from PubSub, transform them into ClickHouse ORM 
> models and then batch save them into ClickHouse, using as much parallelism as 
> possible - we do not care about order, etc... Elements can be inserted in any 
> order and are not correlated to one another.
> The potential issue is in _class 
> AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
> trigger.py_:
>  
> {code:java}
> context.set_timer(
> '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
> {code}
> This will basically override the previously set timer every time a new 
> element comes in, and in the case of a constant influx of elements, the 
> trigger only fires once we have no more elements for X seconds.
>  
>  
> Please let me know if I understood the documentation right, and if I can 
> further help.
>  
> Thanks you,
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-6177) AfterProcessingTime not firing

2020-06-01 Thread Beam JIRA Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-6177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-6177:

Labels: stale-P2  (was: )

> AfterProcessingTime not firing
> --
>
> Key: BEAM-6177
> URL: https://issues.apache.org/jira/browse/BEAM-6177
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.8.0
>Reporter: Arnaud T
>Priority: P2
>  Labels: stale-P2
>
> Hi,
>  
> Documentation says that a AfterProcessingTime(X) trigger should fire X 
> seconds after the first element is processed, but it appears that this 
> trigger never fires when using a Global window on a steady influx of elements.
> Here is my pipeline:
>  
> {code:java}
> (p
> | 'pubsub' >> 
> beam.io.ReadFromPubSub(topic=self._pubsub_topic).with_output_types(bytes)
> | 'window' >> beam.WindowInto(
>   window.GlobalWindows(),
>   trigger=Repeatedly(AfterProcessingTime(5)),
>   accumulation_mode=AccumulationMode.DISCARDING
>   )
> | 'decode' >> beam.FlatMap(converter.from_pubsub).with_output_types(PubSubRow)
> | 'row' >> beam.Map(lambda x: converter.to_clickhouse(x.type, x.data))
> | 'combine' >> beam.CombineGlobally(ListCombineFn()).without_defaults()
> | 'clickhouse' >> ClickHouseSink(self._clickhouse_host, 
> self._clickhouse_port,self._clickhouse_database)
> )
> {code}
>  
>  
> I expect that every 5 seconds (as long as elements are pouring in), the 
> trigger would fire and my data would be combined. The idea of this pipeline 
> is simply to get messages from PubSub, transform them into ClickHouse ORM 
> models and then batch save them into ClickHouse, using as much parallelism as 
> possible - we do not care about order, etc... Elements can be inserted in any 
> order and are not correlated to one another.
> The potential issue is in _class 
> AfterProcessingTime(TriggerFn)::on_element(self, element, window, context) in 
> trigger.py_:
>  
> {code:java}
> context.set_timer(
> '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)
> {code}
> This will basically override the previously set timer every time a new 
> element comes in, and in the case of a constant influx of elements, the 
> trigger only fires once we have no more elements for X seconds.
>  
>  
> Please let me know if I understood the documentation right, and if I can 
> further help.
>  
> Thanks you,
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)