Fei Wang created SPARK-12222:
--------------------------------

             Summary: deserialize RoaringBitmap using Kryo serializer throw 
Buffer underflow exception
                 Key: SPARK-12222
                 URL: https://issues.apache.org/jira/browse/SPARK-12222
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.6.0
            Reporter: Fei Wang


here are some problems when deserialize RoaringBitmap. see the examples below:
run this piece of code
```
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import java.io.DataInput

    class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
      override def readLong(): Long = input.readLong()
      override def readChar(): Char = input.readChar()
      override def readFloat(): Float = input.readFloat()
      override def readByte(): Byte = input.readByte()
      override def readShort(): Short = input.readShort()
      override def readUTF(): String = input.readString() // readString in kryo 
does utf8
      override def readInt(): Int = input.readInt()
      override def readUnsignedShort(): Int = input.readShortUnsigned()
      override def skipBytes(n: Int): Int = input.skip(n.toLong).toInt
      override def readFully(b: Array[Byte]): Unit = input.read(b)
      override def readFully(b: Array[Byte], off: Int, len: Int): Unit = 
input.read(b, off, len)
      override def readLine(): String = throw new 
UnsupportedOperationException("readLine")
      override def readBoolean(): Boolean = input.readBoolean()
      override def readUnsignedByte(): Int = input.readByteUnsigned()
      override def readDouble(): Double = input.readDouble()
    }

    class KryoOutputDataOutputBridge(output: KryoOutput) extends DataOutput {
      override def writeFloat(v: Float): Unit = output.writeFloat(v)
      // There is no "readChars" counterpart, except maybe "readLine", which is 
not supported
      override def writeChars(s: String): Unit = throw new 
UnsupportedOperationException("writeChars")
      override def writeDouble(v: Double): Unit = output.writeDouble(v)
      override def writeUTF(s: String): Unit = output.writeString(s) // 
writeString in kryo does UTF8
      override def writeShort(v: Int): Unit = output.writeShort(v)
      override def writeInt(v: Int): Unit = output.writeInt(v)
      override def writeBoolean(v: Boolean): Unit = output.writeBoolean(v)
      override def write(b: Int): Unit = output.write(b)
      override def write(b: Array[Byte]): Unit = output.write(b)
      override def write(b: Array[Byte], off: Int, len: Int): Unit = 
output.write(b, off, len)
      override def writeBytes(s: String): Unit = output.writeString(s)
      override def writeChar(v: Int): Unit = output.writeChar(v.toChar)
      override def writeLong(v: Long): Unit = output.writeLong(v)
      override def writeByte(v: Int): Unit = output.writeByte(v)
    }
    val outStream = new FileOutputStream("D:\\wfserde")
    val output = new KryoOutput(outStream)
    val bitmap = new RoaringBitmap
    bitmap.add(1)
    bitmap.add(3)
    bitmap.add(5)
    bitmap.serialize(new KryoOutputDataOutputBridge(output))
    output.flush()
    output.close()

    val inStream = new FileInputStream("D:\\wfserde")
    val input = new KryoInput(inStream)
    val ret = new RoaringBitmap
    ret.deserialize(new KryoInputDataInputBridge(input))

    println(ret)
```

this will throw `Buffer underflow` error:
```
com.esotericsoftware.kryo.KryoException: Buffer underflow.
        at com.esotericsoftware.kryo.io.Input.require(Input.java:156)
        at com.esotericsoftware.kryo.io.Input.skip(Input.java:131)
        at com.esotericsoftware.kryo.io.Input.skip(Input.java:264)
        at 
org.apache.spark.sql.SQLQuerySuite$$anonfun$6$KryoInputDataInputBridge$1.skipBytes
```

after same investigation,  i found this is caused by a bug of kryo's 
`Input.skip(long count)`(https://github.com/EsotericSoftware/kryo/issues/119) 
and we call this method in `KryoInputDataInputBridge`.

So i think we can fix this issue in this two ways:
1) upgrade the kryo version to 2.23.0 or 2.24.0, which has fix this bug in kryo 
(i am not sure the upgrade is safe in spark, can you check it? @davies )

2) we can bypass the  kryo's `Input.skip(long count)` by directly call another 
`skip` method in kryo's 
Input.java(https://github.com/EsotericSoftware/kryo/blob/kryo-2.21/src/com/esotericsoftware/kryo/io/Input.java#L124),
 i.e. write the bug-fixed version of `Input.skip(long count)` in 
KryoInputDataInputBridge's `skipBytes` method:
```
   class KryoInputDataInputBridge(input: KryoInput) extends DataInput {
      ...
      override def skipBytes(n: Int): Int = {
        var remaining: Long = n
        while (remaining > 0) {
          val skip = Math.min(Integer.MAX_VALUE, remaining).asInstanceOf[Int]
          input.skip(skip)
          remaining -= skip
        }
        n
     }
      ...
    }
```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to