[ 
https://issues.apache.org/jira/browse/BEAM-7639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17137237#comment-17137237
 ] 

Beam JIRA Bot commented on BEAM-7639:
-------------------------------------

This issue was marked "stale-P2" and has not received a public comment in 14 
days. It is now automatically moved to P3. If you are still affected by it, you 
can comment and move it back to P2.

> Intermittent empty accumulator values in extractOutput of Combine.perKey on 
> Dataflow
> ------------------------------------------------------------------------------------
>
>                 Key: BEAM-7639
>                 URL: https://issues.apache.org/jira/browse/BEAM-7639
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow
>    Affects Versions: 2.10.0
>            Reporter: Piotr Szczepanik
>            Priority: P3
>
> We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on 
> Google Dataflow in streaming mode with fixed windows.
> Using the above versions we have observed a strange and unfortunately 
> intermittent behaviour with Combine.perKey transform used to achieve a reduce 
> operation, e.g. emitting the max value per key or the value based on the last 
> element with the key in window.
> Such reductions are implemented in scio as Combine.CombineFn with the 
> accumulator created as an empty ArrayList and extractOutput doing the actual 
> reduction and returning the output value.
> This works well when at trigger time combine accumulator is non empty and I 
> understand that there should be no triggers fired if there are no input 
> messages processed in the given window for a given key. Otherwise if it is 
> fired I think we may assume there was at least one event with a given key in 
> a given window and it should be in accumulator.
> The transform is part of a job consisting of 40-50 transforms that is 
> consuming messages from two different PubSub topics, transforming, windowing, 
> combining them and then joining to emit output messages to a PubSub topic. 
> Messages in input topics are pulled at 5-300 per second rate depending on a 
> time of day.
> We did run this job split into 3 separate jobs for 6+ months and observed no 
> similar problems but it was not optimal as each of those jobs were using 
> 10-30% of worker CPU. It is after we combined those separate jobs into one 
> *we have started observing exceptions* in the step where the specific 
> transform was used and for which the direct cause is an empty accumulator at 
> the time when window triggers are fired. Those errors happened on 
> subscriptions that had 1 hour retention set and the CPUs were quite stressed 
> then.
> We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of 
> retention was consumed without errors. After another try with retention of 3 
> hours that was successful we tried consuming 6 hours of retention which then 
> again failed.
> We have found similar issues at scio's bugtracker:
> [https://github.com/spotify/scio/issues/778]
> [https://github.com/spotify/scio/issues/1620]
> The workaround proposed there is to use a custom `aggregateByKey` transform 
> which is also based on Combine.perKey but uses a `zero` value which is output 
> when the accumulator is empty. We used this workaround but it is not optimal 
> as there are some cases that there is no good default value, e.g. last/first 
> message in window.
> While searching through Beam's jira I have found an issue that may be similar 
> to ours: https://issues.apache.org/jira/browse/BEAM-7614
> I assume that this issues happen when the CPU, memory or both are stressed in 
> a catch up phase after starting a job with some retention to consume.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to