[ https://issues.apache.org/jira/browse/SPARK-13178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shivaram Venkataraman updated SPARK-13178: ------------------------------------------ Assignee: Sun Rui > RRDD faces with concurrency issue in case of rdd.zip(rdd).count() > ----------------------------------------------------------------- > > Key: SPARK-13178 > URL: https://issues.apache.org/jira/browse/SPARK-13178 > Project: Spark > Issue Type: Bug > Components: SparkR > Reporter: Xusen Yin > Assignee: Sun Rui > Fix For: 2.0.0 > > > In Kmeans algorithm, there is a zip operation before taking samples, i.e. > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L210, > which can be simplified in the following code: > {code:title=zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false} > val rdd = ... > val rdd2 = rdd.map(x => x) > rdd.zip(rdd2).count() > {code} > However, RRDD fails on this operation with an error of "can only zip rdd with > same number of elements" or "stream closed", similar to the JIRA issue: > https://issues.apache.org/jira/browse/SPARK-2251 > Inside RRDD, a data stream is used to ingest data from the R side. In the zip > operation, zip with self computes each partition twice. So if we zip a > HadoopRDD (iris dataset) with itself, we get > {code:title=log-from-zip-HadoopRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false} > we get a pair (6.8, 6.8) > we get a pair (5.1, 5.1) > we get a pair (6.7, 6.7) > we get a pair (4.9, 4.9) > we get a pair (6.0, 6.0) > we get a pair (4.7, 4.7) > we get a pair (5.7, 5.7) > we get a pair (4.6, 4.6) > we get a pair (5.5, 5.5) > we get a pair (5.0, 5.0) > we get a pair (5.5, 5.5) > we get a pair (5.4, 5.4) > we get a pair (5.8, 5.8) > we get a pair (4.6, 4.6) > we get a pair (6.0, 6.0) > we get a pair (5.0, 5.0) > we get a pair (5.4, 5.4) > we get a pair (4.4, 4.4) > we get a pair (6.0, 6.0) > we get a pair (4.9, 4.9) > we get a pair (6.7, 6.7) > we get a pair (5.4, 5.4) > we get a pair (6.3, 6.3) > we get a pair (4.8, 4.8) > we get a pair (5.6, 5.6) > we get a pair (4.8, 4.8) > we get a pair (5.5, 5.5) > we get a pair (4.3, 4.3) > we get a pair (5.5, 5.5) > we get a pair (5.8, 5.8) > we get a pair (6.1, 6.1) > we get a pair (5.7, 5.7) > we get a pair (5.8, 5.8) > we get a pair (5.4, 5.4) > we get a pair (5.0, 5.0) > we get a pair (5.1, 5.1) > we get a pair (5.6, 5.6) > we get a pair (5.7, 5.7) > we get a pair (5.7, 5.7) > we get a pair (5.1, 5.1) > we get a pair (5.7, 5.7) > we get a pair (5.4, 5.4) > we get a pair (6.2, 6.2) > we get a pair (5.1, 5.1) > we get a pair (5.1, 5.1) > we get a pair (4.6, 4.6) > we get a pair (5.7, 5.7) > we get a pair (5.1, 5.1) > we get a pair (6.3, 6.3) > we get a pair (4.8, 4.8) > we get a pair (5.8, 5.8) > we get a pair (5.0, 5.0) > we get a pair (7.1, 7.1) > we get a pair (5.0, 5.0) > we get a pair (6.3, 6.3) > we get a pair (5.2, 5.2) > we get a pair (6.5, 6.5) > we get a pair (5.2, 5.2) > we get a pair (7.6, 7.6) > we get a pair (4.7, 4.7) > we get a pair (4.9, 4.9) > we get a pair (4.8, 4.8) > we get a pair (7.3, 7.3) > we get a pair (5.4, 5.4) > we get a pair (6.7, 6.7) > we get a pair (5.2, 5.2) > we get a pair (7.2, 7.2) > we get a pair (5.5, 5.5) > we get a pair (6.5, 6.5) > we get a pair (4.9, 4.9) > we get a pair (6.4, 6.4) > we get a pair (5.0, 5.0) > we get a pair (6.8, 6.8) > we get a pair (5.5, 5.5) > we get a pair (5.7, 5.7) > we get a pair (4.9, 4.9) > we get a pair (5.8, 5.8) > we get a pair (4.4, 4.4) > we get a pair (6.4, 6.4) > we get a pair (5.1, 5.1) > we get a pair (6.5, 6.5) > we get a pair (5.0, 5.0) > we get a pair (7.7, 7.7) > we get a pair (4.5, 4.5) > we get a pair (7.7, 7.7) > we get a pair (4.4, 4.4) > we get a pair (6.0, 6.0) > we get a pair (5.0, 5.0) > we get a pair (6.9, 6.9) > we get a pair (5.1, 5.1) > we get a pair (5.6, 5.6) > we get a pair (4.8, 4.8) > we get a pair (7.7, 7.7) > we get a pair (6.3, 6.3) > we get a pair (5.1, 5.1) > we get a pair (6.7, 6.7) > we get a pair (4.6, 4.6) > we get a pair (7.2, 7.2) > we get a pair (5.3, 5.3) > we get a pair (6.2, 6.2) > we get a pair (5.0, 5.0) > we get a pair (6.1, 6.1) > we get a pair (7.0, 7.0) > we get a pair (6.4, 6.4) > we get a pair (6.4, 6.4) > we get a pair (7.2, 7.2) > we get a pair (6.9, 6.9) > we get a pair (7.4, 7.4) > we get a pair (5.5, 5.5) > we get a pair (7.9, 7.9) > we get a pair (6.5, 6.5) > we get a pair (6.4, 6.4) > we get a pair (5.7, 5.7) > we get a pair (6.3, 6.3) > we get a pair (6.3, 6.3) > we get a pair (6.1, 6.1) > we get a pair (4.9, 4.9) > we get a pair (7.7, 7.7) > we get a pair (6.6, 6.6) > we get a pair (6.3, 6.3) > we get a pair (5.2, 5.2) > we get a pair (6.4, 6.4) > we get a pair (5.0, 5.0) > we get a pair (6.0, 6.0) > we get a pair (5.9, 5.9) > we get a pair (6.9, 6.9) > we get a pair (6.0, 6.0) > we get a pair (6.7, 6.7) > we get a pair (6.1, 6.1) > we get a pair (6.9, 6.9) > we get a pair (5.6, 5.6) > we get a pair (5.8, 5.8) > we get a pair (6.7, 6.7) > we get a pair (6.8, 6.8) > we get a pair (5.6, 5.6) > we get a pair (6.7, 6.7) > we get a pair (5.8, 5.8) > we get a pair (6.7, 6.7) > we get a pair (6.2, 6.2) > we get a pair (6.3, 6.3) > we get a pair (5.6, 5.6) > we get a pair (6.5, 6.5) > we get a pair (5.9, 5.9) > we get a pair (6.2, 6.2) > we get a pair (6.1, 6.1) > we get a pair (5.9, 5.9) > we get a pair (6.3, 6.3) > we get a pair (6.1, 6.1) > we get a pair (6.4, 6.4) > we get a pair (6.6, 6.6) > {code} > However, in RRDD with the same setting we get: > {code:title=log-from-zip-RRDD|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false} > we get a pair (5.1, 5.1) > we get a pair (4.9, 4.7) > we get a pair (4.6, 5.0) > we get a pair (5.4, 4.6) > we get a pair (5.0, 4.4) > we get a pair (4.9, 5.4) > we get a pair (4.8, 4.8) > we get a pair (4.3, 5.8) > we get a pair (5.7, 5.4) > we get a pair (5.1, 5.7) > we get a pair (5.1, 5.4) > we get a pair (5.1, 4.6) > we get a pair (5.1, 4.8) > we get a pair (5.0, 5.0) > we get a pair (5.2, 5.2) > we get a pair (4.7, 4.8) > we get a pair (5.4, 5.2) > we get a pair (5.5, 4.9) > we get a pair (5.0, 5.5) > we get a pair (4.9, 4.4) > we get a pair (5.1, 5.0) > we get a pair (4.5, 4.4) > we get a pair (5.0, 5.1) > we get a pair (4.8, 5.1) > we get a pair (4.6, 5.3) > we get a pair (5.0, 7.0) > we get a pair (6.4, 6.9) > we get a pair (5.5, 6.5) > we get a pair (5.7, 6.3) > we get a pair (4.9, 6.6) > we get a pair (5.2, 5.0) > we get a pair (5.9, 6.0) > we get a pair (6.1, 5.6) > we get a pair (6.7, 5.6) > we get a pair (5.8, 6.2) > we get a pair (5.6, 5.9) > we get a pair (6.1, 6.3) > we get a pair (6.1, 6.4) > we get a pair (6.6, 6.8) > we get a pair (6.7, 6.0) > we get a pair (5.7, 5.5) > we get a pair (5.5, 5.8) > we get a pair (6.0, 5.4) > we get a pair (6.0, 6.7) > we get a pair (6.3, 5.6) > we get a pair (5.5, 5.5) > we get a pair (6.1, 5.8) > we get a pair (5.0, 5.6) > we get a pair (5.7, 5.7) > we get a pair (6.2, 5.1) > we get a pair (5.7, 6.3) > we get a pair (5.8, 7.1) > we get a pair (6.3, 6.5) > we get a pair (7.6, 4.9) > we get a pair (7.3, 6.7) > we get a pair (7.2, 6.5) > we get a pair (6.4, 6.8) > we get a pair (5.7, 5.8) > we get a pair (6.4, 6.5) > we get a pair (7.7, 7.7) > we get a pair (6.0, 6.9) > we get a pair (5.6, 7.7) > we get a pair (6.3, 6.7) > we get a pair (7.2, 6.2) > we get a pair (6.1, 6.4) > we get a pair (7.2, 7.4) > we get a pair (7.9, 6.4) > we get a pair (6.3, 6.1) > we get a pair (7.7, 6.3) > we get a pair (6.4, 6.0) > we get a pair (6.9, 6.7) > we get a pair (6.9, 5.8) > we get a pair (6.8, 6.7) > we get a pair (6.7, 6.3) > we get a pair (6.5, 6.2) > we need to close stream java.io.DataInputStream@507affd3 in thread 127 > we need to close stream java.io.DataInputStream@507affd3 in thread 127 > {code} > We can see from the end of the log that the data stream is closed twice. > The simplest way to avoid the error is using "cache" to cut off the lineage > as shown below. However, sometimes we do not want to cache the data. > {code:title=work-around-zip.scala|theme=FadeToGrey|linenumbers=true|language=scala|firstline=0001|collapse=false} > val rdd = ... > rdd.cache() > val rdd2 = rdd.map(x => x) > rdd.zip(rdd2).count() > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org