[ https://issues.apache.org/jira/browse/BEAM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aviem Zur reassigned BEAM-2359: ------------------------------- Assignee: Aviem Zur (was: Amit Sela) > SparkTimerInternals inputWatermarkTime does not get updated in cluster mode > --------------------------------------------------------------------------- > > Key: BEAM-2359 > URL: https://issues.apache.org/jira/browse/BEAM-2359 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: Aviem Zur > Assignee: Aviem Zur > Fix For: 2.1.0 > > > {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster > mode. > This causes windows to not get closed and state to increase forever in memory > and processing time to increase leading to eventual application crash (also, > triggers based on the watermark do not fire). > The root cause is > a call from within the {{updateStateByKey}} operation in > [SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242] > which tries to access a static reference to a {{GlobalWatermarkHolder}} > broadcast variable, however, in cluster mode this static reference would be a > different one in the executor's JVM and is null (this works in local mode > since the executor and driver are on the same JVM). > Alternative Solutions (And viability of solution): > * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not > viable since even if we use the broadcast correctly, broadcast variables > can't be used in this case (from within {{updateStateByKey}}) since > {{updateStateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so > it will not be updated every micro-batch but rather will retain the same > initial value. > * -Broadcast variable to update the data in an additional transform- - Create > an additional transform on the {{DStream}}'s RDDs prior to the {{DStream}} > operator {{updateStateByKey}} and use a broadcast which will be updated > (since this is an {{RDD}} operator), and add this value to the keyed datum > itself so it will be available in the {{DStream}} operator > {{updateStateByKey}}. Not viable since this will only update keys which have > had new data appear in the microbatch, however we also want to update the > watermark value for keys which did not have new data appear in the microbatch. > * -Broadcast variable to update a static reference- - Create an additional > transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator > {{updateStateByKey}} and use a broadcast which will be updated (since this is > an {{RDD}} operator), and set this value in a static reference within the > executor. Not viable since we cannot ensure that all executors will receive > partitions to process in each microbatch. > * Server to be polled lazily every microbatch from within the > {{updateStateByKey}} operator - Spin a server on some configured port on the > driver which will serve the current watermarks upon request. Lazily poll this > value every microbatch from within the {{updateStateByKey}} operator and > update a static reference within the executor. Viable, however does not use > Spark native operations and incurs code maintenance for this and operational > cost for the user (open ports in firewalls, etc.). > * Drop/register watermarks as a block in BlockManager and request remote > version from within the {{updateStateByKey}} operator - Update watermarks as > a block in the BlockManager on the driver by dropping and reregistering the > block every microbatch. Lazily poll this value every microbatch from within > the {{updateStateByKey}} operator and update a static reference within the > executor. Viable, less "ugly" than the server version and requires less > operational cost. -- This message was sent by Atlassian JIRA (v6.4.14#64029)