I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.

The code is as follows:

       val conf = new SparkConf()
                .setAppName("Hello Spark")
                .set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryo.registrator", "xt.MyKryoRegistrator")

        val sc = new SparkContext(conf)

        val rdd = sc.parallelize(List(
                    (new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
KeyValue()),
                    (new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
KeyValue()),
                    (new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
KeyValue()),
                    (new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
KeyValue())), 4)

        // snippet 1:  a single object of *ImmutableBytesWritable* can be
serialized in broadcast
        val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3))))
        val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
        println("\n\n\ret.count = " + ret.count + ",  partition size = " +
ret.partitions.size)

        // snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
        val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
        val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
        val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
        println("\n\n\nrdd2.count = " + ret1.count)

        sc.stop


      // the following are kryo registrator and partitioners
       class MyKryoRegistrator extends KryoRegistrator {
            override def registerClasses(kryo: Kryo): Unit = {
                 kryo.register(classOf[ImmutableBytesWritable])   //
register ImmutableBytesWritable
                 kryo.register(classOf[Array[ImmutableBytesWritable]])
 // register
Array[ImmutableBytesWritable]
            }
       }

       class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
            override def numPartitions: Int = 5
            def v = Bytes.toInt(bc.value.get)
            override def getPartition(key: Any): Int =  v - 1
       }


        class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
            val arr = bc.value
            override def numPartitions: Int = arr.length
            override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
        }



In the code above, snippet 1 can work as expected. But snippet 2 throws
"Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .


So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?

Thanks

Reply via email to