[ 
https://issues.apache.org/jira/browse/SPARK-22163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194228#comment-16194228
 ] 

Sean Owen commented on SPARK-22163:
-----------------------------------

This is, at least, a little more specific. The problem is this: "5. After Spark 
Thread #3 is done, the driver does other processing to finish the current 
batch. In my case, it updates a list of objects." This is going to be the 
source of the problem. Spark can't and won't modify your own app objects; you 
must be doing it. It's not clear from your text, but you imply this happens 
outside of Spark's threads, in an app thread you spawn. You may believe you're 
correctly somehow waiting for something in Spark to do something, but in 
general you can't do this from your app.

If you read the stack trace, it's not related to checkpointing. It's ensuring a 
closure is serializable before executing a stage.
Remember too that there's no need to synchronize reads. This error can only 
happen if something modifies the list, and that's your app. In general you 
wouldn't be modifying objects on the driver in your app, because you driver 
hands over control to Spark with StreamingContext.start(), and then all other 
operations are distributed.

It sounds like you are executing locally (?) and may be trying to modify local 
state on the driver from executors, which happen to be in the same JVM. That's 
another source of modifications, and one you may think happen before or after 
something, but in fact whose behavior is undefined.

This is why we tried to ask for a reproduction, or at least some of your 
source, and that's still not here. Without that, the many guesses about what 
your app is doing slightly wrong are the best guess one can make. You need to 
investigate those. If you want to continue at this point, you'll need to post a 
reproduction.


> Design Issue of Spark Streaming that Causes Random Run-time Exception
> ---------------------------------------------------------------------
>
>                 Key: SPARK-22163
>                 URL: https://issues.apache.org/jira/browse/SPARK-22163
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams, Structured Streaming
>    Affects Versions: 2.2.0
>         Environment: Spark Streaming
> Kafka
> Linux
>            Reporter: Michael N
>
> The application objects can contain List and can be modified dynamically as 
> well.   However, Spark Streaming framework asynchronously serializes the 
> application's objects as the application runs.  Therefore, it causes random 
> run-time exception on the List when Spark Streaming framework happens to 
> serializes the application's objects while the application modifies a List in 
> its own object.  
> In fact, there are multiple bugs reported about
> Caused by: java.util.ConcurrentModificationException
> at java.util.ArrayList.writeObject
> that are permutation of the same root cause. So the design issue of Spark 
> streaming framework is that it should do this serialization asynchronously.  
> Instead, it should either
> 1. do this serialization synchronously. This is preferred to eliminate the 
> issue completely.  Or
> 2. Allow it to be configured per application whether to do this serialization 
> synchronously or asynchronously, depending on the nature of each application.
> Also, Spark documentation should describe the conditions that trigger Spark 
> to do this type of serialization asynchronously, so the applications can work 
> around them until the fix is provided. 
> ===
> Vadim Semenov and Steve Loughran, per your inquiries in ticket 
> https://issues.apache.org/jira/browse/SPARK-21999, I am posting the reply 
> here because this issue involves Spark's design and not necessarily its code 
> implementation.
> —
> My application does not spin up its own thread. All the threads are 
> controlled by Spark.
> Batch interval = 5 seconds
> Batch #3
> 1. Driver - Spark Thread #1 - starts batch #3 and blocks until all slave 
> threads are done with this batch
> 2. Slave A - Spark Thread #2. Says it takes 10 seconds to complete
> 3. Slave B - Spark Thread #3. Says it takes 1 minutes to complete
> 4. Both thread #1 for the driver and thread 2# for Slave A do not jump ahead 
> and process batch #4. Instead, they wait for thread #3 until it is done. => 
> So there is already synchronization among the threads within the same batch. 
> Also, batch to batch is synchronous.
> 5. After Spark Thread #3 is done, the driver does other processing to finish 
> the current batch. In my case, it updates a list of objects.
> The above steps repeat for the next batch #4 and subsequent batches.
> Based on the exception stack trace, it looks like in step 5, Spark has 
> another thread #4 that serializes application objects asynchronously. So it 
> causes random occurrences of ConcurrentModificationException, because the 
> list of objects is being changed by Spark own thread #1 for the driver.
> So the issue is not that my application "is modifying a collection 
> asynchronously w.r.t. Spark" as Sean kept claiming. Instead, it is Spark's 
> asynchronous operations among its own different threads within the same batch 
> that causes this issue.
> I understand Spark needs to serializes objects for check point purposes. 
> However, since Spark controls all the threads and their synchronization, it 
> is a Spark design's issue for the lack of synchronization between threads #1 
> and #4, that triggers ConcurrentModificationException.  That is the root 
> cause of this issue.
> Further, even if the application does not modify its list of objects, in step 
> 5 the driver could be modifying multiple native objects say two integers. In 
> thread #1 the driver could have updated integer X and before it could update 
> integer Y, when Spark's thread #4 asynchronous serializes the application 
> objects. So the persisted serialized data does not match with the actual 
> data. This resulted in a permutation of this issue with a false positive 
> condition where the serialized checkpoint data has partially correct data.
> One solution for both issues is to modify Spark's design and allow the 
> serialization of application objects by Spark's thread #4 to be configurable 
> per application to be either asynchronous or synchronous with Spark's thread 
> #1. That way, it is up to individual applications to decide based on the 
> nature of their business requirements and needed throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to