Github user scwf commented on the pull request:

    https://github.com/apache/spark/pull/9748#issuecomment-162860246
  
    @davies here are some problems when deserialize for RoaringBitmap. see the 
examples below:
    run this piece of code
    ```
    import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => 
KryoOutput}
    import java.io.DataInput
    
        class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
          override def readLong(): Long = input.readLong()
          override def readChar(): Char = input.readChar()
          override def readFloat(): Float = input.readFloat()
          override def readByte(): Byte = input.readByte()
          override def readShort(): Short = input.readShort()
          override def readUTF(): String = input.readString() // readString in 
kryo does utf8
          override def readInt(): Int = input.readInt()
          override def readUnsignedShort(): Int = input.readShortUnsigned()
          override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
          override def readFully(b: Array[Byte]): Unit = input.read(b)
          override def readFully(b: Array[Byte], off: Int, len: Int): Unit = 
input.read(b, off, len)
          override def readLine(): String = throw new 
UnsupportedOperationException("readLine")
          override def readBoolean(): Boolean = input.readBoolean()
          override def readUnsignedByte(): Int = input.readByteUnsigned()
          override def readDouble(): Double = input.readDouble()
        }
    
        class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput 
{
          override def writeFloat(v: Float): Unit = output.writeFloat(v)
          // There is no "readChars" counterpart, except maybe "readLine", 
which is not supported
          override def writeChars(s: String): Unit = throw new 
UnsupportedOperationException("writeChars")
          override def writeDouble(v: Double): Unit = output.writeDouble(v)
          override def writeUTF(s: String): Unit = output.writeString(s) // 
writeString in kryo does UTF8
          override def writeShort(v: Int): Unit = output.writeShort(v)
          override def writeInt(v: Int): Unit = output.writeInt(v)
          override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
          override def write(b: Int): Unit = output.write(b)
          override def write(b: Array[Byte]): Unit = output.write(b)
          override def write(b: Array[Byte], off: Int, len: Int): Unit = 
output.write(b, off, len)
          override def writeBytes(s: String): Unit = output.writeString(s)
          override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
          override def writeLong(v: Long): Unit = output.writeLong(v)
          override def writeByte(v: Int): Unit = output.writeByte(v)
        }
        val outStream = new FileOutputStream("D:\\wfserde")
        val output = new KryoOutput(outStream)
        val bitmap = new RoaringBitmap
        bitmap.add(1)
        bitmap.add(3)
        bitmap.add(5)
        bitmap.serialize(new KryoOutputDataOutputBridge(output))
        output.flush()
        output.close()
    
        val inStream = new FileInputStream("D:\\wfserde")
        val input = new KryoInput(inStream)
        val ret = new RoaringBitmap
        ret.deserialize(new KryoInputDataInputBridge(input))
    
        println(ret)
    ```
    
    this will throw `Buffer underflow` error:
    ```
    com.esotericsoftware.kryo.KryoException: Buffer underflow.
        at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
        at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
        at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
        at 
org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes
    ```
    
    after same investigation,  i found this is caused by a bug of kryo's 
`Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) 
and we call this method in `KryoInputDataInputBridge`.
    
    So i think we can fix this issue in this two ways:
    1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in 
kryo (i am not sure the upgrade is safe in spark, can you check it? @davies )
    
    2) we can bypass the  kryo's `Input.skip(long count)` by directly call 
another `skip` method in kryo's 
Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124),
 i.e. write the bug-fixed version of `Input.skip(long count)` in 
KryoInputDataInputBridge's `skipBytes` method:
    ```
       class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
          ...
          override def skipBytes(n: Int): Int = {
            var remaining: Long = n
            while (remaining > 0) {
              val skip = Math.min(Integer.MAX_VALUE, 
remaining).asInstanceOf[Int]
              input.skip(skip)
              remaining -= skip
            }
            n
         }
          ...
        }
    ```
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to