Re: Array in broadcast can't be serialized

2015-02-16 Thread Tao Xiao
Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of "java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer" was thrown:

val conf = new SparkConf()
   .setAppName("Test Serialization")

Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?

2015-02-15 22:23 GMT+08:00 Ted Yu :

> I was looking at
> It seems this would achieve what you want:
> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
> Cheers
> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao 
> wrote:
>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>> serialized even when I registered both of them in Kryo.
>> The code is as follows:
>>val conf = new SparkConf()
>> .setAppName("Hello Spark")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>> val sc = new SparkContext(conf)
>> val rdd = sc.parallelize(List(
>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>> new KeyValue())), 4)
>> // snippet 1:  a single object of *ImmutableBytesWritable* can
>> be serialized in broadcast
>> val partitioner = new SingleElementPartitioner(sc.broadcast(new
>> ImmutableBytesWritable(Bytes.toBytes(3
>> val ret = rdd.aggregateByKey(List[KeyValue](),
>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>> println("\n\n\ret.count = " + ret.count + ",  partition size = "
>> + ret.partitions.size)
>> // snippet 2: an array of *ImmutableBytesWritable* can not be
>> serialized in broadcast
>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
>> ImmutableBytesWritable(Bytes.toBytes(2)), new
>> ImmutableBytesWritable(Bytes.toBytes(3)))
>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>> val ret1 = rdd.aggregateByKey(List[KeyValue](),
>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>> println("\n\n\nrdd2.count = " + ret1.count)
>> sc.stop
>>   // the following are kryo registrator and partitioners
>>class MyKryoRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo): Unit = {
>>  kryo.register(classOf[ImmutableBytesWritable])   //
>> register ImmutableBytesWritable
>>  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>  // register Array[ImmutableBytesWritable]
>> }
>>class SingleElementPartitioner(bc:
>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>> override def numPartitions: Int = 5
>> def v = Bytes.toInt(bc.value.get)
>> override def getPartition(key: Any): Int =  v - 1
>> class ArrayPartitioner(bc:
>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>> val arr = bc.value
>> override def numPartitions: Int = arr.length
>> override def getPartition(key: Any): Int =
>> Bytes.toInt(arr(0).get)
>> }
>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>> "Task not serializable:
>>"  .
>> So do I have to implement a Kryo serializer for Array[T] if it is used in
>> broadcast ?
>> Thanks

2014-11-11 22:35 GMT+08:00 Ritesh Kumar Singh :

Re: All executors run on just a few nodes

2014-10-19 Thread Tao Xiao

Thank you.

But I read from other thread
that "PROCESS_LOCAL" means the data is in the same JVM as the code that is
running. When data is in the same JVM with the code that is running, the
data should be on the same node as JVM, i.e., the data can be said to be

Also you said that the tasks will be assigned to available executors which
satisfy the application's requirements. But what requirements must an
executor satisfy so that task can be assigned to it? Do you mean resources
(memory, CPU)?

Finally, is there any way to guarantee that all executors for an
application will be run on all Spark nodes when data to be processed is big
enough (for example, HBase data resides on all RegionServers) ?

2014-10-20 11:35 GMT+08:00 raymond :

> My best guess is the speed that your executors got registered with driver
> differs between each run.
> when you run it for the first time, the executors is not fully registered
> when task set manager start to assign tasks, and thus the tasks was
> assigned to available executors which have already satisfy what you need
> ,say 86 with one batch.
> And the “Process_local” does not necessary means that the data is local,
> it could be that the executor is not available yet for the data source ( in
> your case, might though will be available later).
> If this is the case, you could just sleep a few seconds before run the
> job. or there are some patches related and providing other way to sync
> executors status before running applications, but I haven’t track the
> related status for a while.
> Raymond
> On 2014年10月20日, at 上午11:22, Tao Xiao  wrote:
> Hi all,
> I have a Spark-0.9 cluster, which has 16 nodes.
> I wrote a Spark application to read data from an HBase table, which has 86
> regions spreading over 20 RegionServers.
> I submitted the Spark app in Spark standalone mode and found that there
> were 86 executors running on just 3 nodes and it took about  30 minutes to
> read data from the table. In this case, I noticed from Spark master UI
> that Locality Level of all executors are "PROCESS_LOCAL".
> Later I ran the same app again (without any code changed) and found that
> those 86 executors were running on 16 nodes, and this time it took just 4
> minutes to read date from the same HBase table. In this case, I noticed
> that Locality Level of most executors are "NODE_LOCAL".
> After testing multiple times, I found the two cases above occur randomly.
> So I have 2 questions:
> 1)  Why would the two cases above occur randomly when I submitted the same
> application multiple times ?
> 2)  Would the spread of executors influence locality level ?
> Thank you.

Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-13 Thread Tao Xiao
Thanks Akhil.  Both ways work for me, but I'd like to know why that
exception was thrown. The class HBaseApp and related class were all
contained in my application jar, why was *com.xt.scala.HBaseApp$$*
*anonfun$testHBase$1* not found ?

2014-10-13 14:53 GMT+08:00 Akhil Das :

> Adding your application jar to the sparkContext will resolve this issue.
> Eg:
> sparkContext.addJar("./target/scala-2.10/myTestApp_2.10-1.0.jar")
> Thanks
> Best Regards
> On Mon, Oct 13, 2014 at 8:42 AM, Tao Xiao 
> wrote:
>> In the beginning I tried to read HBase and found that exception was
>> thrown, then I start to debug the app. I removed the codes reading HBase
>> and tried to save an rdd containing a list and the exception was still
>> thrown. So I'm sure that exception was not caused by reading HBase.
>> While debugging I did not change the object name and file name.
>> 2014-10-13 0:00 GMT+08:00 Ted Yu :
>>> Your app is named scala.HBaseApp
>>> Does it read / write to HBase ?
>>> Just curious.
>>> On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao 
>>> wrote:
>>>> Hi all,
>>>> I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark
>>>> Standalone Cluster mode.
>>>> The job is quite simple as follows:
>>>>   object HBaseApp {
>>>> def main(args:Array[String]) {
>>>> testHBase("student", "/test/xt/saveRDD")
>>>> }
>>>> def testHBase(tableName: String, outFile:String) {
>>>>   val sparkConf = new SparkConf()
>>>> .setAppName("-- Test HBase --")
>>>> .set("spark.executor.memory", "2g")
>>>> .set("spark.cores.max", "16")
>>>>   val sparkContext = new SparkContext(sparkConf)
>>>>   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
>>>>   val c = rdd.count // successful
>>>>   println("\n\n\n"  + c + "\n\n\n")
>>>>   rdd.saveAsTextFile(outFile)  // This line will throw
>>>> "java.lang.ClassNotFoundException:
>>>> com.xt.scala.HBaseApp$$anonfun$testHBase$1"
>>>>   println("\n  down  \n")
>>>> }
>>>> }
>>>> I submitted this job using the following script:
>>>> #!/bin/bash
>>>> HBASE_CLASSPATH=$(hbase classpath)
>>>> APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
>>>> SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
>>>> SPARK_MASTER=spark://
>>>> export SPARK_CLASSPATH=/usr/lib/hbase/lib/*
>>>> CONFIG_OPTS="-Dspark.master=$SPARK_MASTER"
>>>> java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@
>>>> After I submitted the job, the count of rdd could be computed
>>>> successfully, but that rdd could not be saved into HDFS and the following
>>>> exception was thrown:
>>>> 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
>>>> java.lang.ClassNotFoundException
>>>> java.lang.ClassNotFoundException:
>>>> com.xt.scala.HBaseApp$$anonfun$testHBase$1
>>>>  at$
>>>>  at$
>>>>  at Method)
>>>>  at
>>>>  at java.lang.ClassLoader.loadClass(
>>>>  at java.lang.ClassLoader.loadClass(
>>>>  at java.lang.Class.forName0(Native Method)
>>>>  at java.lang.Class.forName(
>>>>  at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>>>  at
>>>>  at

Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-12 Thread Tao Xiao
In the beginning I tried to read HBase and found that exception was thrown,
then I start to debug the app. I removed the codes reading HBase and tried
to save an rdd containing a list and the exception was still thrown. So I'm
sure that exception was not caused by reading HBase.

While debugging I did not change the object name and file name.

2014-10-13 0:00 GMT+08:00 Ted Yu :

> Your app is named scala.HBaseApp
> Does it read / write to HBase ?
> Just curious.
> On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao 
> wrote:
>> Hi all,
>> I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark Standalone
>> Cluster mode.
>> The job is quite simple as follows:
>>   object HBaseApp {
>> def main(args:Array[String]) {
>> testHBase("student", "/test/xt/saveRDD")
>> }
>> def testHBase(tableName: String, outFile:String) {
>>   val sparkConf = new SparkConf()
>> .setAppName("-- Test HBase --")
>> .set("spark.executor.memory", "2g")
>> .set("spark.cores.max", "16")
>>   val sparkContext = new SparkContext(sparkConf)
>>   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
>>   val c = rdd.count // successful
>>   println("\n\n\n"  + c + "\n\n\n")
>>   rdd.saveAsTextFile(outFile)  // This line will throw
>> "java.lang.ClassNotFoundException:
>> com.xt.scala.HBaseApp$$anonfun$testHBase$1"
>>   println("\n  down  \n")
>> }
>> }
>> I submitted this job using the following script:
>> #!/bin/bash
>> HBASE_CLASSPATH=$(hbase classpath)
>> APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
>> SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
>> SPARK_MASTER=spark://
>> export SPARK_CLASSPATH=/usr/lib/hbase/lib/*
>> CONFIG_OPTS="-Dspark.master=$SPARK_MASTER"
>> java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@
>> After I submitted the job, the count of rdd could be computed
>> successfully, but that rdd could not be saved into HDFS and the following
>> exception was thrown:
>> 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException
>> java.lang.ClassNotFoundException:
>> com.xt.scala.HBaseApp$$anonfun$testHBase$1
>>  at$
>>  at$
>>  at Method)
>>  at
>>  at java.lang.ClassLoader.loadClass(
>>  at java.lang.ClassLoader.loadClass(
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(
>>  at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at
>>  at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(
>>  at
>> sun.reflect.DelegatingMethodA

2014-10-08 15:09 GMT+08:00 Sean Owen :

2014-10-08 13:44 GMT+08:00 Sean Owen :

2014-10-02 0:58 GMT+08:00 Vladimir Rodionov :

2014-10-01 12:04 GMT+08:00 Ted Yu :

2014-09-30 12:17 GMT+08:00 Vladimir Rodionov :

> HBase TableInputFormat creates input splits one per each region. You can
> not achieve high level of parallelism unless you have 5-10 regions per RS
> at least. What does it mean? You probably have too few regions. You can
> verify that in HBase Web UI.
> -Vladimir Rodionov
> On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao 
> wrote:
>> I submitted a job in Yarn-Client mode, which simply reads from a HBase
>> table containing tens of millions of records and then does a *count *action.
>> The job runs for a much longer time than I expected, so I wonder whether it
>> was because the data to read was too much. Actually, there are 20 nodes in
>> my Hadoop cluster so the HBase table seems not so big (tens of millopns of
>> records). :
>> I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).
>> BTW, when the job was running, I can see logs on the console, and
>> specifically I'd like to know what the following log means:
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
>> TID 20 on executor 2: (PROCESS_LOCAL)
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20
>> as 13454 bytes in 0 ms
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
>> ms on (progress: 18/86)
>> 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)
>> Thanks

2014-09-30 10:21 GMT+08:00 Tao Xiao :

2014-09-15 14:57 GMT+08:00 x :

> How about this.
> scala> val rdd2 = rdd.combineByKey(
>  | (v: Int) => v.toLong,
>  | (c: Long, v: Int) => c + v,
>  | (c1: Long, c2: Long) => c1 + c2)
> rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
> combineB
> yKey at :14
> xj @ Tokyo
> On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao 
> wrote:
>> I followd an example presented in the tutorial Learning Spark
>> <>
>> to compute the per-key average as follows:
>> val Array(appName) = args
>> val sparkConf = new SparkConf()
>> .setAppName(appName)
>> val sc = new SparkContext(sparkConf)
>> /*
>>  * compute the per-key average of values
>>  * results should be:
>>  *A : 5.8
>>  *B : 14
>>  *C : 60.6
>>  */
>> val rdd = sc.parallelize(List(
>> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
>> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
>> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
>> val avg = rdd.combineByKey(
>> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
>> cannot be cast to java.lang.Integer
>> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
>> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2))
>> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>> When I submitted the application, an exception of 
>> "*java.lang.ClassCastException:
>> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
>> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
>> => (x, 1)*, should take a single element in the source RDD and return an
>> element of the desired type in the resulting RDD. In my application, we
>> take a single element of type *Int *from the source RDD and return a
>> tuple of type (*Int*, *Int*), which meets the requirements quite well.
>> But why would such an exception be thrown?
>> I'm using CDH 5.0 and Spark 0.9
>> Thanks.

2014-09-03 10:28 GMT+08:00 Tao Xiao :

2014-08-31 23:10 GMT+08:00 Yi Tian :

2014-02-26 15:19 GMT+08:00 Matei Zaharia :

2014-02-26 8:56 GMT+08:00 Mayur Rustagi :

