[ https://issues.apache.org/jira/browse/BEAM-7639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17137237#comment-17137237 ]
Beam JIRA Bot commented on BEAM-7639: ------------------------------------- This issue was marked "stale-P2" and has not received a public comment in 14 days. It is now automatically moved to P3. If you are still affected by it, you can comment and move it back to P2. > Intermittent empty accumulator values in extractOutput of Combine.perKey on > Dataflow > ------------------------------------------------------------------------------------ > > Key: BEAM-7639 > URL: https://issues.apache.org/jira/browse/BEAM-7639 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Affects Versions: 2.10.0 > Reporter: Piotr Szczepanik > Priority: P3 > > We are using Spotify’s scio 0.7.2 which is built with Apache Beam 2.10.0 on > Google Dataflow in streaming mode with fixed windows. > Using the above versions we have observed a strange and unfortunately > intermittent behaviour with Combine.perKey transform used to achieve a reduce > operation, e.g. emitting the max value per key or the value based on the last > element with the key in window. > Such reductions are implemented in scio as Combine.CombineFn with the > accumulator created as an empty ArrayList and extractOutput doing the actual > reduction and returning the output value. > This works well when at trigger time combine accumulator is non empty and I > understand that there should be no triggers fired if there are no input > messages processed in the given window for a given key. Otherwise if it is > fired I think we may assume there was at least one event with a given key in > a given window and it should be in accumulator. > The transform is part of a job consisting of 40-50 transforms that is > consuming messages from two different PubSub topics, transforming, windowing, > combining them and then joining to emit output messages to a PubSub topic. > Messages in input topics are pulled at 5-300 per second rate depending on a > time of day. > We did run this job split into 3 separate jobs for 6+ months and observed no > similar problems but it was not optimal as each of those jobs were using > 10-30% of worker CPU. It is after we combined those separate jobs into one > *we have started observing exceptions* in the step where the specific > transform was used and for which the direct cause is an empty accumulator at > the time when window triggers are fired. Those errors happened on > subscriptions that had 1 hour retention set and the CPUs were quite stressed > then. > We tried changing machine type to larger ones “-n2” -> “-n4” and an hour of > retention was consumed without errors. After another try with retention of 3 > hours that was successful we tried consuming 6 hours of retention which then > again failed. > We have found similar issues at scio's bugtracker: > [https://github.com/spotify/scio/issues/778] > [https://github.com/spotify/scio/issues/1620] > The workaround proposed there is to use a custom `aggregateByKey` transform > which is also based on Combine.perKey but uses a `zero` value which is output > when the accumulator is empty. We used this workaround but it is not optimal > as there are some cases that there is no good default value, e.g. last/first > message in window. > While searching through Beam's jira I have found an issue that may be similar > to ours: https://issues.apache.org/jira/browse/BEAM-7614 > I assume that this issues happen when the CPU, memory or both are stressed in > a catch up phase after starting a job with some retention to consume. -- This message was sent by Atlassian Jira (v8.3.4#803005)