[ 
https://issues.apache.org/jira/browse/SPARK-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15145625#comment-15145625
 ] 

Bryan Cutler commented on SPARK-10086:
--------------------------------------

I was able to track down the cause of these failures, so here is an update with 
what I found.  The test {{StreamingKMeansTest.test_trainOn_predictOn}} has 2 
{{DStream.foreachRDD}} output operations, 1 in the call to 
{{StreamingKMeans.trainOn}} and 1 with {{collect}} which has a parent 
{{DStream}} that is a {{PythonTransformedDStream}} returned from 
{{StreamingKMeans.predictOn}}, so 2 jobs are generated for each batch.  When 
the {{DStream}} jobs are generated, there is nothing to compute for the first 
job, which updates the model.  For generating the second job, 
{{PythonTransformedDStream.compute}} gets called which will then do a 
{{PythonTransformFunction}} callback that creates a {{PythonRDD}} and 
serializes the mapped predict function to a command, containing the current 
model.  

Next, the 2 jobs are scheduled in order - first to update the model and then 
collect the predicted result.  At this point, there is a race condition between 
completing the model update and generating the next set of jobs, which is 
running in a different thread.  If there is enough of a delay in the update, 
then the next set of jobs will be generated and the old model will be 
serialized to the {{PythonRDD}} command again.  Finally, the predict will be 
run against this old model causing the test failure.  

To sum it up, the underlying issue is that a func can be serialized with a 
value before a job is run that updates this value.  This doesn't appear to be 
an issue in the Scala code as the closure cleaner is run just before the job is 
executed, and it will get the updated values.

So far, the best solution I can think of would be to somehow delay the 
serialization of the model until it is needed, but I believe this would involve 
some big changes in {{PythonRDD}} as would any other solutions I could think 
of.  Is something that would be worth doing to correct this, or might there be 
an easier fix that I am not seeing?  It's not just a {{StreamingKMeans}} issue, 
so it would affect any PySpark streaming application with similar structure.  

I am attaching some simplified code used to reproduce the issue.  I also have a 
similar Scala version that produces the expected results.

> Flaky StreamingKMeans test in PySpark
> -------------------------------------
>
>                 Key: SPARK-10086
>                 URL: https://issues.apache.org/jira/browse/SPARK-10086
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib, PySpark, Streaming, Tests
>    Affects Versions: 1.5.0
>            Reporter: Joseph K. Bradley
>            Priority: Critical
>
> Here's a report on investigating test failures in StreamingKMeans in PySpark. 
> (See Jenkins links below.)
> It is a StreamingKMeans test which trains on a DStream with 2 batches and 
> then tests on those same 2 batches.  It fails here: 
> [https://github.com/apache/spark/blob/1968276af0f681fe51328b7dd795bd21724a5441/python/pyspark/mllib/tests.py#L1144]
> I recreated the same test, with variants training on: (1) the original 2 
> batches, (2) just the first batch, (3) just the second batch, and (4) neither 
> batch.  Here is code which avoids Streaming altogether to identify what 
> batches were processed.
> {code}
> from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
> batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
> batches = [sc.parallelize(batch) for batch in batches]
> stkm = StreamingKMeans(decayFactor=0.0, k=2)
> stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
> # Train
> def update(rdd):
>     stkm._model.update(rdd, stkm._decayFactor, stkm._timeUnit)
> # Remove one or both of these lines to test skipping batches.
> update(batches[0])
> update(batches[1])
> # Test
> def predict(rdd):
>     return stkm._model.predict(rdd)
> predict(batches[0]).collect()
> predict(batches[1]).collect()
> {code}
> *Results*:
> {code}
> ####################### EXPECTED
> [0, 1, 1]                                                                     
>   
> [1, 0, 1]
> ####################### Skip batch 0
> [1, 0, 0]
> [0, 1, 0]
> ####################### Skip batch 1
> [0, 1, 1]
> [1, 0, 1]
> ####################### Skip both batches  (This is what we see in the test 
> failures.)
> [0, 1, 1]
> [0, 0, 0]
> {code}
> Skipping both batches reproduces the failure.  There is no randomness in the 
> StreamingKMeans algorithm (since initial centers are fixed, not randomized).
> CC: [~tdas] [~freeman-lab] [~mengxr]
> Failure message:
> {code}
> ======================================================================
> FAIL: test_trainOn_predictOn (__main__.StreamingKMeansTest)
> Test that prediction happens on the updated model.
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
>  line 1147, in test_trainOn_predictOn
>     self._eventually(condition, catch_assertions=True)
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
>  line 123, in _eventually
>     raise lastValue
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
>  line 114, in _eventually
>     lastValue = condition()
>   File 
> "/home/jenkins/workspace/SparkPullRequestBuilder@3/python/pyspark/mllib/tests.py",
>  line 1144, in condition
>     self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
> AssertionError: Lists differ: [[0, 1, 1], [0, 0, 0]] != [[0, 1, 1], [1, 0, 1]]
> First differing element 1:
> [0, 0, 0]
> [1, 0, 1]
> - [[0, 1, 1], [0, 0, 0]]
> ?                 ^^^^
> + [[0, 1, 1], [1, 0, 1]]
> ?              +++   ^
> ----------------------------------------------------------------------
> Ran 62 tests in 164.188s
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to