I was looking at https://github.com/twitter/chill

It seems this would achieve what you want:
chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

Cheers

On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao <xiaotao.cs....@gmail.com> wrote:

> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
> serialized even when I registered both of them in Kryo.
>
> The code is as follows:
>
>        val conf = new SparkConf()
>                 .setAppName("Hello Spark")
>                 .set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>                 .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>
>         val sc = new SparkContext(conf)
>
>         val rdd = sc.parallelize(List(
>                     (new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
> KeyValue()),
>                     (new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
> KeyValue()),
>                     (new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
> KeyValue()),
>                     (new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
> KeyValue())), 4)
>
>         // snippet 1:  a single object of *ImmutableBytesWritable* can be
> serialized in broadcast
>         val partitioner = new SingleElementPartitioner(sc.broadcast(new
> ImmutableBytesWritable(Bytes.toBytes(3))))
>         val ret = rdd.aggregateByKey(List[KeyValue](),
> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>         println("\n\n\ret.count = " + ret.count + ",  partition size = " +
> ret.partitions.size)
>
>         // snippet 2: an array of *ImmutableBytesWritable* can not be
> serialized in broadcast
>         val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
> ImmutableBytesWritable(Bytes.toBytes(2)), new
> ImmutableBytesWritable(Bytes.toBytes(3)))
>         val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>         val ret1 = rdd.aggregateByKey(List[KeyValue](),
> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>         println("\n\n\nrdd2.count = " + ret1.count)
>
>         sc.stop
>
>
>       // the following are kryo registrator and partitioners
>        class MyKryoRegistrator extends KryoRegistrator {
>             override def registerClasses(kryo: Kryo): Unit = {
>                  kryo.register(classOf[ImmutableBytesWritable])   //
> register ImmutableBytesWritable
>                  kryo.register(classOf[Array[ImmutableBytesWritable]])  // 
> register
> Array[ImmutableBytesWritable]
>             }
>        }
>
>        class SingleElementPartitioner(bc:
> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>             override def numPartitions: Int = 5
>             def v = Bytes.toInt(bc.value.get)
>             override def getPartition(key: Any): Int =  v - 1
>        }
>
>
>         class ArrayPartitioner(bc:
> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>             val arr = bc.value
>             override def numPartitions: Int = arr.length
>             override def getPartition(key: Any): Int =
> Bytes.toInt(arr(0).get)
>         }
>
>
>
> In the code above, snippet 1 can work as expected. But snippet 2 throws
> "Task not serializable: java.io.NotSerializableException:
> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>
>
> So do I have to implement a Kryo serializer for Array[T] if it is used in
> broadcast ?
>
> Thanks
>
>
>
>
>

Reply via email to