[ 
https://issues.apache.org/jira/browse/BEAM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aviem Zur reassigned BEAM-2359:
-------------------------------

    Assignee: Aviem Zur  (was: Amit Sela)

> SparkTimerInternals inputWatermarkTime does not get updated in cluster mode
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-2359
>                 URL: https://issues.apache.org/jira/browse/BEAM-2359
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Aviem Zur
>            Assignee: Aviem Zur
>             Fix For: 2.1.0
>
>
> {{SparkTimerInternals#inputWatermarkTime}} does not get updated in cluster 
> mode.
> This causes windows to not get closed and state to increase forever in memory 
> and processing time to increase leading to eventual application crash (also, 
> triggers based on the watermark do not fire).
> The root cause is 
> a call from within the {{updateStateByKey}} operation in 
> [SparkGroupAlsoByWindowViaWindowSet|https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java#L241-L242]
>  which tries to access a static reference to a {{GlobalWatermarkHolder}} 
> broadcast variable, however, in cluster mode this static reference would be a 
> different one in the executor's JVM and is null (this works in local mode 
> since the executor and driver are on the same JVM).
> Alternative Solutions (And viability of solution):
> * -Broadcast variable passed to the {{updateStateByKey}} operator- - Not 
> viable since even if we use the broadcast correctly, broadcast variables 
> can't be used in this case (from within {{updateStateByKey}}) since  
> {{updateStateByKey}} is a {{DStream}} operator and not an {{RDD}} operator so 
> it will not be updated every micro-batch but rather will retain the same 
> initial value.
> * -Broadcast variable to update the data in an additional transform- - Create 
> an additional transform on the {{DStream}}'s RDDs prior to the {{DStream}} 
> operator {{updateStateByKey}} and use a broadcast which will be updated 
> (since this is an {{RDD}} operator), and add this value to the keyed datum 
> itself so it will be available in the {{DStream}} operator 
> {{updateStateByKey}}. Not viable since this will only update keys which have 
> had new data appear in the microbatch, however we also want to update the 
> watermark value for keys which did not have new data appear in the microbatch.
> * -Broadcast variable to update a static reference- - Create an additional 
> transform on the {{DStream}}'s RDDs prior to the {{DStream}} operator 
> {{updateStateByKey}} and use a broadcast which will be updated (since this is 
> an {{RDD}} operator), and set this value in a static reference within the 
> executor. Not viable since we cannot ensure that all executors will receive 
> partitions to process in each microbatch.
> * Server to be polled lazily every microbatch from within the 
> {{updateStateByKey}} operator - Spin a server on some configured port on the 
> driver which will serve the current watermarks upon request. Lazily poll this 
> value every microbatch from within the {{updateStateByKey}} operator and 
> update a static reference within the executor. Viable, however does not use 
> Spark native operations and incurs code maintenance for this and operational 
> cost for the user (open ports in firewalls, etc.).
> * Drop/register watermarks as a block in BlockManager and request remote 
> version from within the {{updateStateByKey}} operator - Update watermarks as 
> a block in the BlockManager on the driver by dropping and reregistering the 
> block every microbatch. Lazily poll this value every microbatch from within 
> the {{updateStateByKey}} operator and update a static reference within the 
> executor. Viable, less "ugly" than the server version and requires less 
> operational cost.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to