I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo.
The code is as follows: val conf = new SparkConf() .setAppName("Hello Spark") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "xt.MyKryoRegistrator") val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes("AAA")), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes("BBB")), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes("CCC")), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes("DDD")), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3)))) val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist() println("\n\n\ret.count = " + ret.count + ", partition size = " + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ) println("\n\n\nrdd2.count = " + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws "Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable" . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks