[ 
https://issues.apache.org/jira/browse/SPARK-5066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DoingDone9 updated SPARK-5066:
------------------------------
    Description: 
when spill is open, data ordered by hashCode will be spilled to disk. We need 
get all key that has the same hashCode from different tmp files when merge 
value, but it just read the key that has the minHashCode that in a tmp file, we 
can not read all key.
Example :
If file1 has [k1, k2, k3], file2 has [k4,k5,k1].
And hashcode of k4 < hashcode of k5 < hashcode of k1 <  hashcode of k2 <  
hashcode of k3
we just  read k1 from file1 and k4 from file2. Can not read all k1.

Code :

private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => 
it.buffered)

    inputStreams.foreach { it =>
      val kcPairs = new ArrayBuffer[(K, C)]
      readNextHashCode(it, kcPairs)
      if (kcPairs.length > 0) {
        mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
      }
    }

 private def readNextHashCode(it: BufferedIterator[(K, C)], buf: 
ArrayBuffer[(K, C)]): Unit = {
      if (it.hasNext) {
        var kc = it.next()
        buf += kc
        val minHash = hashKey(kc)
        while (it.hasNext && it.head._1.hashCode() == minHash) {
          kc = it.next()
          buf += kc
        }
      }
    }



> Can not get all key that has same hashcode  when reading key ordered  from 
> different Streaming.
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5066
>                 URL: https://issues.apache.org/jira/browse/SPARK-5066
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.2.0
>            Reporter: DoingDone9
>            Priority: Critical
>
> when spill is open, data ordered by hashCode will be spilled to disk. We need 
> get all key that has the same hashCode from different tmp files when merge 
> value, but it just read the key that has the minHashCode that in a tmp file, 
> we can not read all key.
> Example :
> If file1 has [k1, k2, k3], file2 has [k4,k5,k1].
> And hashcode of k4 < hashcode of k5 < hashcode of k1 <  hashcode of k2 <  
> hashcode of k3
> we just  read k1 from file1 and k4 from file2. Can not read all k1.
> Code :
> private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => 
> it.buffered)
>     inputStreams.foreach { it =>
>       val kcPairs = new ArrayBuffer[(K, C)]
>       readNextHashCode(it, kcPairs)
>       if (kcPairs.length > 0) {
>         mergeHeap.enqueue(new StreamBuffer(it, kcPairs))
>       }
>     }
>  private def readNextHashCode(it: BufferedIterator[(K, C)], buf: 
> ArrayBuffer[(K, C)]): Unit = {
>       if (it.hasNext) {
>         var kc = it.next()
>         buf += kc
>         val minHash = hashKey(kc)
>         while (it.hasNext && it.head._1.hashCode() == minHash) {
>           kc = it.next()
>           buf += kc
>         }
>       }
>     }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to