[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-04-22 Thread Sun Rui (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253518#comment-15253518
 ] 

Sun Rui commented on SPARK-13178:
-

This is fixed as the SparkR unit tests can pass after removing the workaround 
for this issue.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pa

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-04-22 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15253517#comment-15253517
 ] 

Apache Spark commented on SPARK-13178:
--

User 'sun-rui' has created a pull request for this issue:
https://github.com/apache/spark/pull/12606

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-04-20 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15250268#comment-15250268
 ] 

Shivaram Venkataraman commented on SPARK-13178:
---

[~sunrui] [~yinxusen] Now that https://github.com/apache/spark/pull/10947 has 
been merged, is this issue resolved ?

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-24 Thread Sun Rui (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15162877#comment-15162877
 ] 

Sun Rui commented on SPARK-13178:
-

Remember to clean the code at 
https://github.com/apache/spark/pull/11124/files#diff-51c07c6af7649f6c021e5a5438e31a4fR122
 when close this JIRA.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-06 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15136132#comment-15136132
 ] 

Xusen Yin commented on SPARK-13178:
---

Cheers for the good news! :)

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.0, 6.0)
> we get a pair (5.9, 5.9)
> we get a pair (6.9,

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-06 Thread Sun Rui (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15135671#comment-15135671
 ] 

Sun Rui commented on SPARK-13178:
-

The root cause is that RRDD.compute() uses some instance variables. If 
compute() is called con-currently, the variables are shared, which breaks 
concurrency. Good news is that this happens to be fixed by 
https://github.com/apache/spark/pull/10947 for SPARK-12972, where each call to 
compute() will create a new instance of RRunner. There is no shared state among 
multiple calls to compute().

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-04 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15133363#comment-15133363
 ] 

Xusen Yin commented on SPARK-13178:
---

Yes, it works, we can use read.json to load a DataFrame that avoids to use 
RRDD. So maybe I need to mark on the K-means algorithm to avoid use RRDD, for 
now.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-04 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15132718#comment-15132718
 ] 

Xusen Yin commented on SPARK-13178:
---

Thanks! I'll try it.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.0, 6.0)
> we get a pair (5.9, 5.9)
> we get a pair (6.9, 6.9)
> 

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Sun Rui (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131634#comment-15131634
 ] 

Sun Rui commented on SPARK-13178:
-

[~xusen]  Could you first use a DataFrame created from something like 
read.json(), within which no RRDD is involved? I will spend some time to 
investigate why this issue happens with RRDD

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131556#comment-15131556
 ] 

Shivaram Venkataraman commented on SPARK-13178:
---

Ah I see - so the problem is that createDataFrame is returning this RRDD which 
on zipping leads to the problem. Is the problem just about closing the stream 
twice ? If that is the case we should probably fix that. 

cc [~sunrui] 

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131463#comment-15131463
 ] 

Xusen Yin commented on SPARK-13178:
---

We can work around with just adding a cache for the "df". But it is not 
elegant. Or do you have other suggestions to work around?

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.4, 6.4)

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131455#comment-15131455
 ] 

Xusen Yin commented on SPARK-13178:
---

I don't zip RRDD with itself. Actually, the bug exists when I calling KMeans 
from R side. I wrote the KMeans for SparkR in this JIRA 
https://issues.apache.org/jira/browse/SPARK-13011 with a code below:

model <- callJStatic("org.apache.spark.ml.api.r.SparkRWrappers", "fitKMeans", 
algorithm,
  x@sdf, iter.max, centers, "Sepal_Length Sepal_Width Petal_Length 
Petal_Width")

In the spark side, I wrote a fitKMeans in 
org.apache.spark.ml.api.r.SparkRWrappers:

def fitKMeans(
  initMode: String,
  df: DataFrame,
  maxIter: Double,
  k: Double,
  columns: String): KMeansModel = {
val assembler = new VectorAssembler().setInputCols(columns.split(" 
")).setOutputCol("features")
val features = assembler.transform(df).select("features")
val kMeans = new KMeans()
  .setInitMode(initMode)
  .setMaxIter(maxIter.toInt)
  .setK(k.toInt)
val model = kMeans.fit(features)
model
  }

The calling of KMeans have the code of rdd.zip(rdd.map(...)), and the rdd is 
derived from RRDD, so I cannot move on without fix it.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Shivaram Venkataraman (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131436#comment-15131436
 ] 

Shivaram Venkataraman commented on SPARK-13178:
---

Hmm this is tricky to debug -- A higher level question: Why do we need to 
implement this using RRDD and zip on it ? The RRDD class is deprecated and 
going to go away soon. I thought the KMeans effort would only require wrapping 
the scala algorithm ?

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair 

[jira] [Commented] (SPARK-13178) RRDD faces with concurrency issue in case of rdd.zip(rdd).count()

2016-02-03 Thread Xusen Yin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15131430#comment-15131430
 ] 

Xusen Yin commented on SPARK-13178:
---

Ping [~mengxr] [~shivaram] to know about the concurrency issue. I am on my way 
to find a solution. It's better to know more from you since I am not the expert 
on this kind of bug.

> RRDD faces with concurrency issue in case of rdd.zip(rdd).count()
> -
>
> Key: SPARK-13178
> URL: https://issues.apache.org/jira/browse/SPARK-13178
> Project: Spark
>  Issue Type: Bug
>  Components: SparkR
>Reporter: Xusen Yin
>
> In Kmeans algorithm, there is a zip operation before taking samples, i.e. 
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210,
>  which can be simplified in the following code:
> {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> val rdd =  ...
> val rdd2 = rdd.map(x => x)
> rdd.zip(rdd2).count()
> {code}
> However, RRDD fails on this operation with an error of "can only zip rdd with 
> same number of elements" or "stream closed", similar to the JIRA issue: 
> https://issues.apache.org/jira/browse/SPARK-2251
> Inside RRDD, a data stream is used to ingest data from the R side. In the zip 
> operation, zip with self computes each partition twice. So if we zip a 
> HadoopRDD (iris dataset) with itself, we get 
> {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false}
> we get a pair (6.8, 6.8)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.9, 4.9)
> we get a pair (6.0, 6.0)
> we get a pair (4.7, 4.7)
> we get a pair (5.7, 5.7)
> we get a pair (4.6, 4.6)
> we get a pair (5.5, 5.5)
> we get a pair (5.0, 5.0)
> we get a pair (5.5, 5.5)
> we get a pair (5.4, 5.4)
> we get a pair (5.8, 5.8)
> we get a pair (4.6, 4.6)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (5.4, 5.4)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (4.9, 4.9)
> we get a pair (6.7, 6.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (5.5, 5.5)
> we get a pair (4.3, 4.3)
> we get a pair (5.5, 5.5)
> we get a pair (5.8, 5.8)
> we get a pair (6.1, 6.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.8, 5.8)
> we get a pair (5.4, 5.4)
> we get a pair (5.0, 5.0)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (5.7, 5.7)
> we get a pair (5.4, 5.4)
> we get a pair (6.2, 6.2)
> we get a pair (5.1, 5.1)
> we get a pair (5.1, 5.1)
> we get a pair (4.6, 4.6)
> we get a pair (5.7, 5.7)
> we get a pair (5.1, 5.1)
> we get a pair (6.3, 6.3)
> we get a pair (4.8, 4.8)
> we get a pair (5.8, 5.8)
> we get a pair (5.0, 5.0)
> we get a pair (7.1, 7.1)
> we get a pair (5.0, 5.0)
> we get a pair (6.3, 6.3)
> we get a pair (5.2, 5.2)
> we get a pair (6.5, 6.5)
> we get a pair (5.2, 5.2)
> we get a pair (7.6, 7.6)
> we get a pair (4.7, 4.7)
> we get a pair (4.9, 4.9)
> we get a pair (4.8, 4.8)
> we get a pair (7.3, 7.3)
> we get a pair (5.4, 5.4)
> we get a pair (6.7, 6.7)
> we get a pair (5.2, 5.2)
> we get a pair (7.2, 7.2)
> we get a pair (5.5, 5.5)
> we get a pair (6.5, 6.5)
> we get a pair (4.9, 4.9)
> we get a pair (6.4, 6.4)
> we get a pair (5.0, 5.0)
> we get a pair (6.8, 6.8)
> we get a pair (5.5, 5.5)
> we get a pair (5.7, 5.7)
> we get a pair (4.9, 4.9)
> we get a pair (5.8, 5.8)
> we get a pair (4.4, 4.4)
> we get a pair (6.4, 6.4)
> we get a pair (5.1, 5.1)
> we get a pair (6.5, 6.5)
> we get a pair (5.0, 5.0)
> we get a pair (7.7, 7.7)
> we get a pair (4.5, 4.5)
> we get a pair (7.7, 7.7)
> we get a pair (4.4, 4.4)
> we get a pair (6.0, 6.0)
> we get a pair (5.0, 5.0)
> we get a pair (6.9, 6.9)
> we get a pair (5.1, 5.1)
> we get a pair (5.6, 5.6)
> we get a pair (4.8, 4.8)
> we get a pair (7.7, 7.7)
> we get a pair (6.3, 6.3)
> we get a pair (5.1, 5.1)
> we get a pair (6.7, 6.7)
> we get a pair (4.6, 4.6)
> we get a pair (7.2, 7.2)
> we get a pair (5.3, 5.3)
> we get a pair (6.2, 6.2)
> we get a pair (5.0, 5.0)
> we get a pair (6.1, 6.1)
> we get a pair (7.0, 7.0)
> we get a pair (6.4, 6.4)
> we get a pair (6.4, 6.4)
> we get a pair (7.2, 7.2)
> we get a pair (6.9, 6.9)
> we get a pair (7.4, 7.4)
> we get a pair (5.5, 5.5)
> we get a pair (7.9, 7.9)
> we get a pair (6.5, 6.5)
> we get a pair (6.4, 6.4)
> we get a pair (5.7, 5.7)
> we get a pair (6.3, 6.3)
> we get a pair (6.3, 6.3)
> we get a pair (6.1, 6.1)
> we get a pair (4.9, 4.9)
> we get a pair (7.7, 7.7)
> we get a pair (6.6, 6.6)
> we get a pair (6.3, 6.3)
> w