I was looking at https://github.com/twitter/chill
It seems this would achieve what you want: chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala Cheers On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com> wrote: > I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be > serialized by Kryo but *Array[ImmutableBytesWritable] *can't be > serialized even when I registered both of them in Kryo. > > The code is as follows: > > val conf = new SparkConf() > .setAppName("Hello Spark") > .set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryo.registrator", "xt.MyKryoRegistrator") > > val sc = new SparkContext(conf) > > val rdd = sc.parallelize(List( > (new ImmutableBytesWritable(Bytes.toBytes("AAA")), new > KeyValue()), > (new ImmutableBytesWritable(Bytes.toBytes("BBB")), new > KeyValue()), > (new ImmutableBytesWritable(Bytes.toBytes("CCC")), new > KeyValue()), > (new ImmutableBytesWritable(Bytes.toBytes("DDD")), new > KeyValue())), 4) > > // snippet 1: a single object of *ImmutableBytesWritable* can be > serialized in broadcast > val partitioner = new SingleElementPartitioner(sc.broadcast(new > ImmutableBytesWritable(Bytes.toBytes(3)))) > val ret = rdd.aggregateByKey(List[KeyValue](), > partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, > (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist() > println("\n\n\ret.count = " + ret.count + ", partition size = " + > ret.partitions.size) > > // snippet 2: an array of *ImmutableBytesWritable* can not be > serialized in broadcast > val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new > ImmutableBytesWritable(Bytes.toBytes(2)), new > ImmutableBytesWritable(Bytes.toBytes(3))) > val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) > val ret1 = rdd.aggregateByKey(List[KeyValue](), > newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs, > (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ) > println("\n\n\nrdd2.count = " + ret1.count) > > sc.stop > > > // the following are kryo registrator and partitioners > class MyKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo): Unit = { > kryo.register(classOf[ImmutableBytesWritable]) // > register ImmutableBytesWritable > kryo.register(classOf[Array[ImmutableBytesWritable]]) // > register > Array[ImmutableBytesWritable] > } > } > > class SingleElementPartitioner(bc: > Broadcast[ImmutableBytesWritable]) extends Partitioner { > override def numPartitions: Int = 5 > def v = Bytes.toInt(bc.value.get) > override def getPartition(key: Any): Int = v - 1 > } > > > class ArrayPartitioner(bc: > Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { > val arr = bc.value > override def numPartitions: Int = arr.length > override def getPartition(key: Any): Int = > Bytes.toInt(arr(0).get) > } > > > > In the code above, snippet 1 can work as expected. But snippet 2 throws > "Task not serializable: java.io.NotSerializableException: > org.apache.hadoop.hbase.io.ImmutableBytesWritable" . > > > So do I have to implement a Kryo serializer for Array[T] if it is used in > broadcast ? > > Thanks > > > > >