[ https://issues.apache.org/jira/browse/SPARK-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14264508#comment-14264508 ]
Sean Owen commented on SPARK-5066: ---------------------------------- I'm not clear what this issue is trying to report. This is code from {{ExternalAppendOnlyMap}} right? The javadoc says: {code} * Fill a buffer with the next set of keys with the same hash code from a given iterator. We * read streams one hash code at a time to ensure we don't miss elements when they are merged. * * Assumes the given iterator is in sorted order of hash code. {code} The behavior and code you describe seems correct then. k4 and k5 would be read from the stream for file 2 first since they have the lowest hashes. Next, k1 would be read from both files. Where are you saying that this breaks down? > Can not get all key that has same hashcode when reading key ordered from > different Streaming. > ----------------------------------------------------------------------------------------------- > > Key: SPARK-5066 > URL: https://issues.apache.org/jira/browse/SPARK-5066 > Project: Spark > Issue Type: Bug > Affects Versions: 1.2.0 > Reporter: DoingDone9 > Priority: Critical > > when spill is open, data ordered by hashCode will be spilled to disk. We need > get all key that has the same hashCode from different tmp files when merge > value, but it just read the key that has the minHashCode that in a tmp file, > we can not read all key. > Example : > If file1 has [k1, k2, k3], file2 has [k4,k5,k1]. > And hashcode of k4 < hashcode of k5 < hashcode of k1 < hashcode of k2 < > hashcode of k3 > we just read k1 from file1 and k4 from file2. Can not read all k1. > Code : > private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => > it.buffered) > inputStreams.foreach { it => > val kcPairs = new ArrayBuffer[(K, C)] > readNextHashCode(it, kcPairs) > if (kcPairs.length > 0) { > mergeHeap.enqueue(new StreamBuffer(it, kcPairs)) > } > } > private def readNextHashCode(it: BufferedIterator[(K, C)], buf: > ArrayBuffer[(K, C)]): Unit = { > if (it.hasNext) { > var kc = it.next() > buf += kc > val minHash = hashKey(kc) > while (it.hasNext && it.head._1.hashCode() == minHash) { > kc = it.next() > buf += kc > } > } > } -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org