Re: Array in broadcast can't be serialized

2015-02-16 Thread Tao Xiao
Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of "java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer" was thrown:

val conf = new SparkConf()
   .setAppName("Test Serialization")
   .set("spark.serializer",
"com.twitter.chill.WrappedArraySerializer")


Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?







2015-02-15 22:23 GMT+08:00 Ted Yu :

> I was looking at https://github.com/twitter/chill
>
> It seems this would achieve what you want:
> chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala
>
> Cheers
>
> On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao 
> wrote:
>
>> I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
>> serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
>> serialized even when I registered both of them in Kryo.
>>
>> The code is as follows:
>>
>>val conf = new SparkConf()
>> .setAppName("Hello Spark")
>> .set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> .set("spark.kryo.registrator", "xt.MyKryoRegistrator")
>>
>> val sc = new SparkContext(conf)
>>
>> val rdd = sc.parallelize(List(
>> (new ImmutableBytesWritable(Bytes.toBytes("AAA")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("BBB")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("CCC")),
>> new KeyValue()),
>> (new ImmutableBytesWritable(Bytes.toBytes("DDD")),
>> new KeyValue())), 4)
>>
>> // snippet 1:  a single object of *ImmutableBytesWritable* can
>> be serialized in broadcast
>> val partitioner = new SingleElementPartitioner(sc.broadcast(new
>> ImmutableBytesWritable(Bytes.toBytes(3
>> val ret = rdd.aggregateByKey(List[KeyValue](),
>> partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
>> println("\n\n\ret.count = " + ret.count + ",  partition size = "
>> + ret.partitions.size)
>>
>> // snippet 2: an array of *ImmutableBytesWritable* can not be
>> serialized in broadcast
>> val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
>> ImmutableBytesWritable(Bytes.toBytes(2)), new
>> ImmutableBytesWritable(Bytes.toBytes(3)))
>> val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
>> val ret1 = rdd.aggregateByKey(List[KeyValue](),
>> newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
>>  (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
>> println("\n\n\nrdd2.count = " + ret1.count)
>>
>> sc.stop
>>
>>
>>   // the following are kryo registrator and partitioners
>>class MyKryoRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo): Unit = {
>>  kryo.register(classOf[ImmutableBytesWritable])   //
>> register ImmutableBytesWritable
>>  kryo.register(classOf[Array[ImmutableBytesWritable]])
>>  // register Array[ImmutableBytesWritable]
>> }
>>}
>>
>>class SingleElementPartitioner(bc:
>> Broadcast[ImmutableBytesWritable]) extends Partitioner {
>> override def numPartitions: Int = 5
>> def v = Bytes.toInt(bc.value.get)
>> override def getPartition(key: Any): Int =  v - 1
>>}
>>
>>
>> class ArrayPartitioner(bc:
>> Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
>> val arr = bc.value
>> override def numPartitions: Int = arr.length
>> override def getPartition(key: Any): Int =
>> Bytes.toInt(arr(0).get)
>> }
>>
>>
>>
>> In the code above, snippet 1 can work as expected. But snippet 2 throws
>> "Task not serializable: java.io.NotSerializableException:
>> org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .
>>
>>
>> So do I have to implement a Kryo serializer for Array[T] if it is used in
>> broadcast ?
>>
>> Thanks
>>
>>
>>
>>
>>
>


Array in broadcast can't be serialized

2015-02-15 Thread Tao Xiao
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.

The code is as follows:

   val conf = new SparkConf()
.setAppName("Hello Spark")
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.MyKryoRegistrator")

val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(
(new ImmutableBytesWritable(Bytes.toBytes("AAA")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("BBB")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("CCC")), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes("DDD")), new
KeyValue())), 4)

// snippet 1:  a single object of *ImmutableBytesWritable* can be
serialized in broadcast
val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3
val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys ).persist()
println("\n\n\ret.count = " + ret.count + ",  partition size = " +
ret.partitions.size)

// snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) => y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) => xs:::ys )
println("\n\n\nrdd2.count = " + ret1.count)

sc.stop


  // the following are kryo registrator and partitioners
   class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
 kryo.register(classOf[ImmutableBytesWritable])   //
register ImmutableBytesWritable
 kryo.register(classOf[Array[ImmutableBytesWritable]])
 // register
Array[ImmutableBytesWritable]
}
   }

   class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
override def numPartitions: Int = 5
def v = Bytes.toInt(bc.value.get)
override def getPartition(key: Any): Int =  v - 1
   }


class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
val arr = bc.value
override def numPartitions: Int = arr.length
override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
}



In the code above, snippet 1 can work as expected. But snippet 2 throws
"Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable"  .


So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?

Thanks


Can not write out data as snappy-compressed files

2014-12-08 Thread Tao Xiao
I'm using CDH 5.1.0  and Spark 1.0.0, and I'd like to write out data
as snappy-compressed files but encounted a problem.

My code is as follows:

  val InputTextFilePath = "hdfs://ec2.hadoop.com:8020/xt/text/new.txt"
  val OutputTextFilePath = "hdfs://ec2.hadoop.com:8020/xt/compressedText/"

  val sparkConf = new SparkConf()
.setAppName("compress data").setMaster(Master)
.set("spark.executor.memory", "8g")
.set("spark.cores.max", "12")
.set("spark.io.compression.codec",
"org.apache.spark.io.SnappyCompressionCodec")

  val sc = new SparkContext(sparkConf)

  val rdd = sc.textFile(InputTextFilePath)
  rdd.saveAsTextFile(OutputTextFilePath,
classOf[org.apache.hadoop.io.compress.SnappyCodec])

  sc.stop


When I submitted the job the following exception was thrown:
14/12/08 21:16:15 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/08 21:16:21 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
14/12/08 21:16:21 WARN TaskSetManager: Loss was due to
java.lang.UnsatisfiedLinkError
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
 at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native
Method)
 at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
 at
org.apache.hadoop.io.compress.SnappyCodec.createCompressor(SnappyCodec.java:143)
 at
org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:98)
 at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
 at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89)
 at org.apache.spark.rdd.PairRDDFunctions.org
$apache$spark$rdd$PairRDDFunctions$$writeToFile$1(PairRDDFunctions.scala:825)
 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840)
 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

It seems that Spark could not find snappy's native library.
I searched for solutions on the Internet and tried the following ways:

Adding "
spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec" to
*/etc/spark/conf.cloudera.spark/spark-defaults.conf*

Adding the following configurations to
*/etc/spark/conf.cloudera.spark/spark-env.sh*
export
JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native
export
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native
export
SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native/libsnappy.so
export
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/snappy-java-1.0.4.1.jar

Yet neither works, can anybody tell me what shoud I do to read from and
write Snappy-compressed files in Spark ?

Any answer is appreciated.


A partitionBy problem

2014-11-18 Thread Tao Xiao
Hi all,

 I tested *partitionBy *feature in wordcount application, and I'm
puzzled by a phenomenon. In this application, I created an rdd from some
text files in HDFS(about 100GB in size), each of which has lines composed
of words separated by a character "#". I wanted to count the occurence for
each distinct word. *All lines have the same contents so finally the result
should be very small in bytes*.  The code is as follows:

  val text = sc.textFile(inputDir)
  val tuples = text.flatMap(line => line.split("#"))
   .map((_, 1))
   .reduceByKey(_ + _)
  tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

I submitted the application to a Spark cluster of 5 nodes and run it in
standalone mode. From the application UI
, we can see that the
shuffle process for *collect* and *reduceByKey* occupied small bandwidth
(766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle
write).

*However, the shuffle process occupied quite large bandwith when I
added partitionBy like this:*

  val text = sc.textFile(inputDir)
   val tuples = text.flatMap(line => line.split("#"))
.map((_, 1))
.partitionBy(new HashPartitioner(100))
.reduceByKey(_ + _)
  tuples.collect.foreach{ case (word, count) => println(word + " -> " +
count)}

>From the application UI ,
we can see that the shuffle read for *collect* is 2.8GB and the shuffle
write for *map* is 3.5GB.

The *map* transformations are applied on 5 nodes of the cluster because the
HDFS blocks are distributed among these 5 nodes. The *map*
transformations are applied for each element in the rdd on different nodes
and doesn't need shuffle the new rdd. *So my first question is : why did
the map transformation occupy so large bandwidth(3.5GB) when I added
partitionBy in the codes ?*

When *collect* is applied, is needs to collect the results, namely (*word*,
*totalCount*) tuples from 5 nodes to the driver. That process should occupy
very small bandwidth because all lines have the same contents like
"AAA#BBB#CCC#DDD", which means the final results the *collect*  retrieved
should be very small in bytes(for example hundreds of KB). *So my second
question is : Why did the collect action occupy so large bandwidth(2.8GB)
when I added partitionByKey in the codes ?*

*And the third question : When I added partitionBy for an rdd, it will
return a new rdd. Does that mean the rdd will be immediately shuffled
across nodes to meet the requirement specified by the supplied partitioner,
or will the supplied partitioner merely be a sign indicating how to
partition the rdd later. *

Thanks.


Re: How to kill a Spark job running in cluster mode ?

2014-11-12 Thread Tao Xiao
Thanks for your replies.

Actually we can kill a driver by the command "bin/spark-class
org.apache.spark.deploy.Client kill  " if you know
the driver id.

2014-11-11 22:35 GMT+08:00 Ritesh Kumar Singh :

> There is a property :
>spark.ui.killEnabled
> which needs to be set true for killing applications directly from the
> webUI.
> Check the link:
> Kill Enable spark job
> <http://spark.apache.org/docs/latest/configuration.html#spark-ui>
>
> Thanks
>
> On Tue, Nov 11, 2014 at 7:42 PM, Sonal Goyal 
> wrote:
>
>> The web interface has a kill link. You can try using that.
>>
>> Best Regards,
>> Sonal
>> Founder, Nube Technologies <http://www.nubetech.co>
>>
>> <http://in.linkedin.com/in/sonalgoyal>
>>
>>
>>
>> On Tue, Nov 11, 2014 at 7:28 PM, Tao Xiao 
>> wrote:
>>
>>> I'm using Spark 1.0.0 and I'd like to kill a job running in cluster
>>> mode, which means the driver is not running on local node.
>>>
>>> So how can I kill such a job? Is there a command like "hadoop job -kill
>>> " which kills a running MapReduce job ?
>>>
>>> Thanks
>>>
>>
>>
>


How to kill a Spark job running in cluster mode ?

2014-11-11 Thread Tao Xiao
I'm using Spark 1.0.0 and I'd like to kill a job running in cluster mode,
which means the driver is not running on local node.

So how can I kill such a job? Is there a command like "hadoop job -kill
" which kills a running MapReduce job ?

Thanks


Re: All executors run on just a few nodes

2014-10-19 Thread Tao Xiao
Raymond,

Thank you.

But I read from other thread
<http://apache-spark-user-list.1001560.n3.nabble.com/When-does-Spark-switch-from-PROCESS-LOCAL-to-NODE-LOCAL-or-RACK-LOCAL-td7091.html>
that "PROCESS_LOCAL" means the data is in the same JVM as the code that is
running. When data is in the same JVM with the code that is running, the
data should be on the same node as JVM, i.e., the data can be said to be
local.

Also you said that the tasks will be assigned to available executors which
satisfy the application's requirements. But what requirements must an
executor satisfy so that task can be assigned to it? Do you mean resources
(memory, CPU)?

Finally, is there any way to guarantee that all executors for an
application will be run on all Spark nodes when data to be processed is big
enough (for example, HBase data resides on all RegionServers) ?




2014-10-20 11:35 GMT+08:00 raymond :

> My best guess is the speed that your executors got registered with driver
> differs between each run.
>
> when you run it for the first time, the executors is not fully registered
> when task set manager start to assign tasks, and thus the tasks was
> assigned to available executors which have already satisfy what you need
> ,say 86 with one batch.
>
> And the “Process_local” does not necessary means that the data is local,
> it could be that the executor is not available yet for the data source ( in
> your case, might though will be available later).
>
> If this is the case, you could just sleep a few seconds before run the
> job. or there are some patches related and providing other way to sync
> executors status before running applications, but I haven’t track the
> related status for a while.
>
>
> Raymond
>
> On 2014年10月20日, at 上午11:22, Tao Xiao  wrote:
>
> Hi all,
>
> I have a Spark-0.9 cluster, which has 16 nodes.
>
> I wrote a Spark application to read data from an HBase table, which has 86
> regions spreading over 20 RegionServers.
>
> I submitted the Spark app in Spark standalone mode and found that there
> were 86 executors running on just 3 nodes and it took about  30 minutes to
> read data from the table. In this case, I noticed from Spark master UI
> that Locality Level of all executors are "PROCESS_LOCAL".
>
> Later I ran the same app again (without any code changed) and found that
> those 86 executors were running on 16 nodes, and this time it took just 4
> minutes to read date from the same HBase table. In this case, I noticed
> that Locality Level of most executors are "NODE_LOCAL".
>
> After testing multiple times, I found the two cases above occur randomly.
>
> So I have 2 questions:
> 1)  Why would the two cases above occur randomly when I submitted the same
> application multiple times ?
> 2)  Would the spread of executors influence locality level ?
>
> Thank you.
>
>
>
>
>


All executors run on just a few nodes

2014-10-19 Thread Tao Xiao
Hi all,

I have a Spark-0.9 cluster, which has 16 nodes.

I wrote a Spark application to read data from an HBase table, which has 86
regions spreading over 20 RegionServers.

I submitted the Spark app in Spark standalone mode and found that there
were 86 executors running on just 3 nodes and it took about  30 minutes to
read data from the table. In this case, I noticed from Spark master UI
that Locality
Level of all executors are "PROCESS_LOCAL".

Later I ran the same app again (without any code changed) and found that
those 86 executors were running on 16 nodes, and this time it took just 4
minutes to read date from the same HBase table. In this case, I noticed
that Locality Level of most executors are "NODE_LOCAL".

After testing multiple times, I found the two cases above occur randomly.

So I have 2 questions:
1)  Why would the two cases above occur randomly when I submitted the same
application multiple times ?
2)  Would the spread of executors influence locality level ?

Thank you.


Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-13 Thread Tao Xiao
Thanks Akhil.  Both ways work for me, but I'd like to know why that
exception was thrown. The class HBaseApp and related class were all
contained in my application jar, why was *com.xt.scala.HBaseApp$$*
*anonfun$testHBase$1* not found ?

2014-10-13 14:53 GMT+08:00 Akhil Das :

> Adding your application jar to the sparkContext will resolve this issue.
>
> Eg:
> sparkContext.addJar("./target/scala-2.10/myTestApp_2.10-1.0.jar")
>
> Thanks
> Best Regards
>
> On Mon, Oct 13, 2014 at 8:42 AM, Tao Xiao 
> wrote:
>
>> In the beginning I tried to read HBase and found that exception was
>> thrown, then I start to debug the app. I removed the codes reading HBase
>> and tried to save an rdd containing a list and the exception was still
>> thrown. So I'm sure that exception was not caused by reading HBase.
>>
>> While debugging I did not change the object name and file name.
>>
>>
>>
>> 2014-10-13 0:00 GMT+08:00 Ted Yu :
>>
>>> Your app is named scala.HBaseApp
>>> Does it read / write to HBase ?
>>>
>>> Just curious.
>>>
>>> On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark
>>>> Standalone Cluster mode.
>>>>
>>>> The job is quite simple as follows:
>>>>
>>>>   object HBaseApp {
>>>> def main(args:Array[String]) {
>>>> testHBase("student", "/test/xt/saveRDD")
>>>> }
>>>>
>>>>
>>>> def testHBase(tableName: String, outFile:String) {
>>>>   val sparkConf = new SparkConf()
>>>> .setAppName("-- Test HBase --")
>>>> .set("spark.executor.memory", "2g")
>>>> .set("spark.cores.max", "16")
>>>>
>>>>   val sparkContext = new SparkContext(sparkConf)
>>>>
>>>>   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
>>>>
>>>>   val c = rdd.count // successful
>>>>   println("\n\n\n"  + c + "\n\n\n")
>>>>
>>>>   rdd.saveAsTextFile(outFile)  // This line will throw
>>>> "java.lang.ClassNotFoundException:
>>>> com.xt.scala.HBaseApp$$anonfun$testHBase$1"
>>>>
>>>>   println("\n  down  \n")
>>>> }
>>>> }
>>>>
>>>> I submitted this job using the following script:
>>>>
>>>> #!/bin/bash
>>>>
>>>> HBASE_CLASSPATH=$(hbase classpath)
>>>> APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
>>>>
>>>> SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
>>>> SPARK_MASTER=spark://b02.jsepc.com:7077
>>>>
>>>> CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
>>>> export SPARK_CLASSPATH=/usr/lib/hbase/lib/*
>>>>
>>>> CONFIG_OPTS="-Dspark.master=$SPARK_MASTER"
>>>>
>>>> java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@
>>>>
>>>> After I submitted the job, the count of rdd could be computed
>>>> successfully, but that rdd could not be saved into HDFS and the following
>>>> exception was thrown:
>>>>
>>>> 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
>>>> java.lang.ClassNotFoundException
>>>> java.lang.ClassNotFoundException:
>>>> com.xt.scala.HBaseApp$$anonfun$testHBase$1
>>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>>>  at java.security.AccessController.doPrivileged(Native Method)
>>>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>>>  at java.lang.Class.forName0(Native Method)
>>>>  at java.lang.Class.forName(Class.java:270)
>>>>  at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>>>  at
>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>>>  at java.io.ObjectInputStream.readClassDesc(Ob

Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-12 Thread Tao Xiao
In the beginning I tried to read HBase and found that exception was thrown,
then I start to debug the app. I removed the codes reading HBase and tried
to save an rdd containing a list and the exception was still thrown. So I'm
sure that exception was not caused by reading HBase.

While debugging I did not change the object name and file name.



2014-10-13 0:00 GMT+08:00 Ted Yu :

> Your app is named scala.HBaseApp
> Does it read / write to HBase ?
>
> Just curious.
>
> On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao 
> wrote:
>
>> Hi all,
>>
>> I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark Standalone
>> Cluster mode.
>>
>> The job is quite simple as follows:
>>
>>   object HBaseApp {
>> def main(args:Array[String]) {
>> testHBase("student", "/test/xt/saveRDD")
>> }
>>
>>
>> def testHBase(tableName: String, outFile:String) {
>>   val sparkConf = new SparkConf()
>> .setAppName("-- Test HBase --")
>> .set("spark.executor.memory", "2g")
>> .set("spark.cores.max", "16")
>>
>>   val sparkContext = new SparkContext(sparkConf)
>>
>>   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
>>
>>   val c = rdd.count // successful
>>   println("\n\n\n"  + c + "\n\n\n")
>>
>>   rdd.saveAsTextFile(outFile)  // This line will throw
>> "java.lang.ClassNotFoundException:
>> com.xt.scala.HBaseApp$$anonfun$testHBase$1"
>>
>>   println("\n  down  \n")
>> }
>> }
>>
>> I submitted this job using the following script:
>>
>> #!/bin/bash
>>
>> HBASE_CLASSPATH=$(hbase classpath)
>> APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
>>
>> SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
>> SPARK_MASTER=spark://b02.jsepc.com:7077
>>
>> CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
>> export SPARK_CLASSPATH=/usr/lib/hbase/lib/*
>>
>> CONFIG_OPTS="-Dspark.master=$SPARK_MASTER"
>>
>> java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@
>>
>> After I submitted the job, the count of rdd could be computed
>> successfully, but that rdd could not be saved into HDFS and the following
>> exception was thrown:
>>
>> 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ClassNotFoundException
>> java.lang.ClassNotFoundException:
>> com.xt.scala.HBaseApp$$anonfun$testHBase$1
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>>  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>  at java.lang.Class.forName0(Native Method)
>>  at java.lang.Class.forName(Class.java:270)
>>  at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>  at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>>  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>  at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingMethodA

ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-12 Thread Tao Xiao
Hi all,

I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark Standalone
Cluster mode.

The job is quite simple as follows:

  object HBaseApp {
def main(args:Array[String]) {
testHBase("student", "/test/xt/saveRDD")
}


def testHBase(tableName: String, outFile:String) {
  val sparkConf = new SparkConf()
.setAppName("-- Test HBase --")
.set("spark.executor.memory", "2g")
.set("spark.cores.max", "16")

  val sparkContext = new SparkContext(sparkConf)

  val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

  val c = rdd.count // successful
  println("\n\n\n"  + c + "\n\n\n")

  rdd.saveAsTextFile(outFile)  // This line will throw
"java.lang.ClassNotFoundException:
com.xt.scala.HBaseApp$$anonfun$testHBase$1"

  println("\n  down  \n")
}
}

I submitted this job using the following script:

#!/bin/bash

HBASE_CLASSPATH=$(hbase classpath)
APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
SPARK_MASTER=spark://b02.jsepc.com:7077

CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
export SPARK_CLASSPATH=/usr/lib/hbase/lib/*

CONFIG_OPTS="-Dspark.master=$SPARK_MASTER"

java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@

After I submitted the job, the count of rdd could be computed successfully,
but that rdd could not be saved into HDFS and the following exception was
thrown:

14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
 at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
 at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
 at
org.apache

Re: Reading from HBase is too slow

2014-10-08 Thread Tao Xiao
Sean,

I did specify the number of cores to use as follows:

... ...
val sparkConf = new SparkConf()
.setAppName("<<< Reading HBase >>>")
.set("spark.cores.max", "32")
val sc = new SparkContext(sparkConf)
... ...



But that does not solve the problem --- only 2 workers are allocated.

I'm using Spark 0.9 and submitting my job through Yarn client mode.
Actually, setting *spark.cores.max* only applies when the job runs on
a *standalone
deploy cluster *or a  *Mesos cluster in "coarse-grained" sharing mode*.
Please refer to this link
<http://spark.apache.org/docs/0.9.1/configuration.html>

So how to specify the number of executors when submitting a Spark 0.9 job
in Yarn Client mode?

2014-10-08 15:09 GMT+08:00 Sean Owen :

> You do need to specify the number of executor cores to use. Executors are
> not like mappers. After all they may do much more in their lifetime than
> just read splits from HBase so would not make sense to determine it by
> something that the first line of the program does.
> On Oct 8, 2014 8:00 AM, "Tao Xiao"  wrote:
>
>> Hi Sean,
>>
>>Do I need to specify the number of executors when submitting the job?
>> I suppose the number of executors will be determined by the number of
>> regions of the table. Just like a MapReduce job, you needn't specify the
>> number of map tasks when reading from a HBase table.
>>
>>   The script to submit my job can be seen in my second post. Please refer
>> to that.
>>
>>
>>
>> 2014-10-08 13:44 GMT+08:00 Sean Owen :
>>
>>> How did you run your program? I don't see from your earlier post that
>>> you ever asked for more executors.
>>>
>>> On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao 
>>> wrote:
>>> > I found the reason why reading HBase is too slow.  Although each
>>> > regionserver serves multiple regions for the table I'm reading, the
>>> number
>>> > of Spark workers allocated by Yarn is too low. Actually, I could see
>>> that
>>> > the table has dozens of regions spread over about 20 regionservers,
>>> but only
>>> > two Spark workers are allocated by Yarn. What is worse, the two
>>> workers run
>>> > one after one. So, the Spark job lost parallelism.
>>> >
>>> > So now the question is : Why are only 2 workers allocated?
>>>
>>
>>


Re: Reading from HBase is too slow

2014-10-08 Thread Tao Xiao
Hi Sean,

   Do I need to specify the number of executors when submitting the job?  I
suppose the number of executors will be determined by the number of regions
of the table. Just like a MapReduce job, you needn't specify the number of
map tasks when reading from a HBase table.

  The script to submit my job can be seen in my second post. Please refer
to that.



2014-10-08 13:44 GMT+08:00 Sean Owen :

> How did you run your program? I don't see from your earlier post that
> you ever asked for more executors.
>
> On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao  wrote:
> > I found the reason why reading HBase is too slow.  Although each
> > regionserver serves multiple regions for the table I'm reading, the
> number
> > of Spark workers allocated by Yarn is too low. Actually, I could see that
> > the table has dozens of regions spread over about 20 regionservers, but
> only
> > two Spark workers are allocated by Yarn. What is worse, the two workers
> run
> > one after one. So, the Spark job lost parallelism.
> >
> > So now the question is : Why are only 2 workers allocated?
>


Re: Reading from HBase is too slow

2014-10-07 Thread Tao Xiao
*


 Here <http://pastebin.com/VhfmHPQe>is the log printed on console while the
Spark job is running.



2014-10-02 0:58 GMT+08:00 Vladimir Rodionov :

> Yes, its in 0.98. CDH is free (w/o subscription) and sometimes its worth
> upgrading to the latest version (which is 0.98 based).
>
> -Vladimir Rodionov
>
> On Wed, Oct 1, 2014 at 9:52 AM, Ted Yu  wrote:
>
>> As far as I know, that feature is not in CDH 5.0.0
>>
>> FYI
>>
>> On Wed, Oct 1, 2014 at 9:34 AM, Vladimir Rodionov <
>> vrodio...@splicemachine.com> wrote:
>>
>>> Using TableInputFormat is not the fastest way of reading data from
>>> HBase. Do not expect 100s of Mb per sec. You probably should take a look at
>>> M/R over HBase snapshots.
>>>
>>> https://issues.apache.org/jira/browse/HBASE-8369
>>>
>>> -Vladimir Rodionov
>>>
>>> On Wed, Oct 1, 2014 at 8:17 AM, Tao Xiao 
>>> wrote:
>>>
>>>> I can submit a MapReduce job reading that table, although its
>>>> processing rate is also a litter slower than I expected, but not that slow
>>>> as Spark.
>>>>
>>>>
>>>>
>>
>


Re: Reading from HBase is too slow

2014-10-01 Thread Tao Xiao
I can submit a MapReduce job reading that table, although its processing
rate is also a litter slower than I expected, but not that slow as Spark.

2014-10-01 12:04 GMT+08:00 Ted Yu :

> Can you launch a job which exercises TableInputFormat on the same table
> without using Spark ?
>
> This would show whether the slowdown is in HBase code or somewhere else.
>
> Cheers
>
> On Mon, Sep 29, 2014 at 11:40 PM, Tao Xiao 
> wrote:
>
>> I checked HBase UI. Well, this table is not completely evenly spread
>> across the nodes, but I think to some extent it can be seen as nearly
>> evenly spread - at least there is not a single node which has too many
>> regions.  Here is a screenshot of HBase UI
>> <http://imgbin.org/index.php?page=image&id=19539>.
>>
>> Besides, I checked the size of each region in bytes for this table in the
>> HBase shell as follows:
>>
>>
>> -bash-4.1$ hadoop dfs -du -h /hbase/data/default/C_CONS
>> DEPRECATED: Use of this script to execute hdfs command is deprecated.
>> Instead use the hdfs command for it.
>>
>> 288  /hbase/data/default/C_CONS/.tabledesc
>> 0/hbase/data/default/C_CONS/.tmp
>> 159.6 M  /hbase/data/default/C_CONS/0008c2494a5399d68495d9c8ae147821
>> 76.7 M   /hbase/data/default/C_CONS/021d7d21d7faeb7b2a77835d6f86747e
>> 81.3 M   /hbase/data/default/C_CONS/02a39a316ac6d2bda89e72e74aa18a6e
>> 155.3 M  /hbase/data/default/C_CONS/02fe51bc077290febc85651d8ee31abc
>> 173.4 M  /hbase/data/default/C_CONS/045859bcc70e36eb4d33f8ca3b7d9633
>> 82.6 M   /hbase/data/default/C_CONS/05c868b6036cc4f1836f70be6215c851
>> 74.1 M   /hbase/data/default/C_CONS/0816378c837f1f3b84f4d4060d22beb3
>> 84.7 M   /hbase/data/default/C_CONS/083da8f5eb8a5b1cca76376449f357ca
>> 346.6 M  /hbase/data/default/C_CONS/0ac70fcb1baea0896ea069a6bcc30898
>> 333.8 M  /hbase/data/default/C_CONS/0b3be845bd4f5e958e8c9a18c8eaab21
>> 72.7 M   /hbase/data/default/C_CONS/12c13610c50dbc8ab27f20b0ebf2bfc4
>> 76.1 M   /hbase/data/default/C_CONS/1341966315d7e53be719d948d595bee0
>> 72.4 M   /hbase/data/default/C_CONS/1acdbc05c502b11da4852a1f21228f44
>> 70.0 M   /hbase/data/default/C_CONS/1b8f57d65f6c0e4de721e4c8f1944829
>> 183.9 M  /hbase/data/default/C_CONS/1f1ae7ca9f725fcf9639a4d52086fa50
>> 65.5 M   /hbase/data/default/C_CONS/20c10b96e2b9c40684aaeb6d0cfbf7c0
>> 76.0 M   /hbase/data/default/C_CONS/22515194fe09adcd4cbb2f5307303c73
>> 78.4 M   /hbase/data/default/C_CONS/236cd80393cb5b7c526bd2c45ce53a0a
>> 150.0 M  /hbase/data/default/C_CONS/23bd80852f47b97b4122709ec844d4ed
>> 81.6 M   /hbase/data/default/C_CONS/241b8bc415029dedf94c4a84e6c4ad3b
>> 77.9 M   /hbase/data/default/C_CONS/27f1e59bde75ef3096a5bdd3eb402cd7
>> 160.8 M  /hbase/data/default/C_CONS/30c2ae3be38b8cdf3b337054a7d61478
>> 372.2 M  /hbase/data/default/C_CONS/31d606da71b35844d0cdc8a195c97d2e
>> 182.6 M  /hbase/data/default/C_CONS/3274a022bc7419d426cf63caa1cc88e1
>> 92.1 M   /hbase/data/default/C_CONS/344faae7971d87b51edf23f75a7c3746
>> 154.7 M  /hbase/data/default/C_CONS/3b3f0c839bdb32ed2104f67c8a02da41
>> 77.4 M   /hbase/data/default/C_CONS/3cf6b2bd0cfe85f3111d0ba1b84a60b4
>> 71.5 M   /hbase/data/default/C_CONS/3f466db078d07e2bfb11c681e0e3
>> 77.8 M   /hbase/data/default/C_CONS/3f8c1b7dec05118eb9894bb591e32b2f
>> 83.6 M   /hbase/data/default/C_CONS/45e105856fcb54748c48bd45e973a3b9
>> 185.2 M  /hbase/data/default/C_CONS/4becd90d46a2d4a6bd8ecbe02b60892c
>> 165.6 M  /hbase/data/default/C_CONS/4dcebd58c7013062c4a8583012a11b5a
>> 67.3 M   /hbase/data/default/C_CONS/51f845d842605dda66b1ae01ad8a17e8
>> 148.2 M  /hbase/data/default/C_CONS/532189155ab78dbd1e36aac3ab4878a8
>> 172.6 M  /hbase/data/default/C_CONS/5401d9cb19adb9bd78718ea047e6d9d7
>> 139.4 M  /hbase/data/default/C_CONS/547d2a8c54aae73e8f12b4570efd984c
>> 89.5 M   /hbase/data/default/C_CONS/54cbac1f71c7781697052bb2aa1c5a18
>> 101.3 M  /hbase/data/default/C_CONS/55263ce293327683b9c6e6098ec3e89a
>> 85.2 M   /hbase/data/default/C_CONS/55f8c278e35de6bca5083c7a66e355fb
>> 85.8 M   /hbase/data/default/C_CONS/57112558912e1de016327e115bc84f11
>> 171.8 M  /hbase/data/default/C_CONS/572b886cbfe92ddcb97502f041953fb8
>> 51   /hbase/data/default/C_CONS/6bd64d8cf6b38806731f7693bdd673c9
>> 86.6 M   /hbase/data/default/C_CONS/7695703b7b527afc5f3524eee9b5d806
>> 74.8 M   /hbase/data/default/C_CONS/7bb7567685f5e16a4379d7cf79de2ecc
>> 120.1 M  /hbase/data/default/C_CONS/7c144bef991bb3c959d7ef6e2fa5036a
>> 166.0 M  /hbase/data/default/C_CONS/7c7817eb3e531d5bda88b5f0de6a20de
>> 173.5 M  /hbase/data/default/C_CONS/7d07c139575d007ecbb23fa946e39130
>> 139.2 M  /hbase/data/default/C_CONS/8295aa7

Re: Reading from HBase is too slow

2014-09-29 Thread Tao Xiao
ult/C_CONS/b1ae0451f592b28eed8a58908f91293a
91.5 M   /hbase/data/default/C_CONS/b8396049e2b742108add1485c0eb4aeb
81.2 M   /hbase/data/default/C_CONS/b8d25b3e536b4fea5ee4ee2b21885c76
87.8 M   /hbase/data/default/C_CONS/bbfbe319705df23a23a89b40e52d89a8
81.3 M   /hbase/data/default/C_CONS/bccaeedc65d9295289f78aaec588cc3d
95.8 M   /hbase/data/default/C_CONS/c229d583958802571dfaa9a39453df0d
88.5 M   /hbase/data/default/C_CONS/c9d7a038243d1b3e2448a48007f1f9e0
158.8 M  /hbase/data/default/C_CONS/cca1bf1f013724af25d71ad4310e5d4a
212.8 M  /hbase/data/default/C_CONS/ccabf798734aa8e05798c43c132ad565
85.1 M   /hbase/data/default/C_CONS/d1cb54346e109b1ba76fd95aa4540161
84.4 M   /hbase/data/default/C_CONS/d4dd8c3fa81b751892689cc92a96aa99
139.5 M  /hbase/data/default/C_CONS/dc15ceeed21474b51086f3103cbd0074
97.7 M   /hbase/data/default/C_CONS/df20e2077f22e83ecd8e0d52dea1
221.0 M  /hbase/data/default/C_CONS/e30d0d55e0887a676c8b79e03771ad23
75.7 M   /hbase/data/default/C_CONS/e6ed24ce0b3e1e903bd9757d28380f3a
74.9 M   /hbase/data/default/C_CONS/e9732d9905f5373fb0fd7a1ce033e17b
101.2 M  /hbase/data/default/C_CONS/f2a49dbaf018f0e45bbd7a758f123418
172.6 M  /hbase/data/default/C_CONS/f34645de36d3c1413ce83177e2118947
89.2 M   /hbase/data/default/C_CONS/f3db2bf3b7ffb7b4c0029eac5d631bdb
81.6 M   /hbase/data/default/C_CONS/f43b49c4f384853266e9ee45a98104a6
68.9 M   /hbase/data/default/C_CONS/fa4fb0047ec98fb10bf84fd72937f415
86.7 M   /hbase/data/default/C_CONS/fc69f349655676e046c9110550825f5a
155.0 M  /hbase/data/default/C_CONS/feb0835bdf73c257de11c65f18b1330d
75.2 M   /hbase/data/default/C_CONS/fff9fbe56af8b9e0e00826f8936e7a56



>From the result above we can see that the biggest region's size is 346.6 M,
while most other regions' size are near each other.

So what may be the real reason ?

2014-09-30 12:17 GMT+08:00 Vladimir Rodionov :

> HBase TableInputFormat creates input splits one per each region. You can
> not achieve high level of parallelism unless you have 5-10 regions per RS
> at least. What does it mean? You probably have too few regions. You can
> verify that in HBase Web UI.
>
> -Vladimir Rodionov
>
> On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao 
> wrote:
>
>> I submitted a job in Yarn-Client mode, which simply reads from a HBase
>> table containing tens of millions of records and then does a *count *action.
>> The job runs for a much longer time than I expected, so I wonder whether it
>> was because the data to read was too much. Actually, there are 20 nodes in
>> my Hadoop cluster so the HBase table seems not so big (tens of millopns of
>> records). :
>>
>> I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).
>>
>> BTW, when the job was running, I can see logs on the console, and
>> specifically I'd like to know what the following log means:
>>
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
>> TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20
>> as 13454 bytes in 0 ms
>> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
>> ms on b04.jsepc.com (progress: 18/86)
>> 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)
>>
>>
>> Thanks
>>
>
>


Re: Reading from HBase is too slow

2014-09-29 Thread Tao Xiao
I submitted the job in Yarn-Client mode using the following script:

export
SPARK_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar

export HADOOP_CLASSPATH=$(hbase classpath)
export
CLASSPATH=$CLASSPATH:/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar:/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hadoop-common-2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar:/usr/games/spark/xt/htrace-core-2.01.jar:$HADOOP_CLASSPATH

CONFIG_OPTS="-Dspark.master=yarn-client
-Dspark.jars=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar,/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar,/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar,/usr/games/spark/xt/htrace-core-2.01.jar"

java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.TestSpark




My job's code is as follows:


object TestSpark {
  def main(args: Array[String]) {
readHBase("C_CONS")
  }

  def readHBase(tableName: String) {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

val sparkConf = new SparkConf()
.setAppName("<<< Reading HBase >>>")
val sc = new SparkContext(sparkConf)

val rdd = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
   classOf[ImmutableBytesWritable], classOf[Result])

println(rdd.count)

  }
}


2014-09-30 10:21 GMT+08:00 Tao Xiao :

> I submitted a job in Yarn-Client mode, which simply reads from a HBase
> table containing tens of millions of records and then does a *count *action.
> The job runs for a much longer time than I expected, so I wonder whether it
> was because the data to read was too much. Actually, there are 20 nodes in
> my Hadoop cluster so the HBase table seems not so big (tens of millopns of
> records). :
>
> I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).
>
> BTW, when the job was running, I can see logs on the console, and
> specifically I'd like to know what the following log means:
>
> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
> TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as
> 13454 bytes in 0 ms
> 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
> ms on b04.jsepc.com (progress: 18/86)
> 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)
>
>
> Thanks
>


Reading from HBase is too slow

2014-09-29 Thread Tao Xiao
I submitted a job in Yarn-Client mode, which simply reads from a HBase
table containing tens of millions of records and then does a *count *action.
The job runs for a much longer time than I expected, so I wonder whether it
was because the data to read was too much. Actually, there are 20 nodes in
my Hadoop cluster so the HBase table seems not so big (tens of millopns of
records). :

I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).

BTW, when the job was running, I can see logs on the console, and
specifically I'd like to know what the following log means:

14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as
13454 bytes in 0 ms
14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
ms on b04.jsepc.com (progress: 18/86)
14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)


Thanks


How to sort rdd filled with existing data structures?

2014-09-24 Thread Tao Xiao
Hi ,

I have the following rdd :

  val conf = new SparkConf()
  .setAppName("<< Testing Sorting >>")
  val sc = new SparkContext(conf)

  val L = List(
  (new Student("XiaoTao", 80, 29), "I'm Xiaotao"),
  (new Student("CCC", 100, 24), "I'm CCC"),
  (new Student("Jack", 90, 25), "I'm Jack"),
  (new Student("Tom", 60, 35), "I'm Tom"),
  (new Student("Lucy", 78, 22), "I'm Lucy"))

  val rdd = sc.parallelize(L, 3)


where Student is a class defined as follows:

class Student(val name:String, val score:Int, val age:Int)  {

 override def toString =
 "name:" + name + ", score:" + score + ", age:" + age

}



I want to sort the *rdd *by key, but when I wrote rdd.sortByKey it
complained that "No implicit Ordering defined", which means I must extend
the class with *Ordered *and provide a method named  *compare*.  The
problem is that the class Student is from a third-party library so I cannot
change its definition. I'd like to know if there is a sorting method that I
can provide it a customized compare function so that it can sort the rdd
according to the sorting function I provide.

One more question, if I want to sort RDD[(k, v)] by value , do I have to
map that rdd so that its key and value exchange their positions in the
tuple? Are there any functions that allow us to sort rdd by things other
than key ?

Thanks


Re: combineByKey throws ClassCastException

2014-09-16 Thread Tao Xiao
This problem was caused by the fact that I used a package jar with a Spark
version (0.9.1) different from that of the cluster (0.9.0). When I used the
correct package jar
(spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the
application can run as expected.



2014-09-15 14:57 GMT+08:00 x :

> How about this.
>
> scala> val rdd2 = rdd.combineByKey(
>  | (v: Int) => v.toLong,
>  | (c: Long, v: Int) => c + v,
>  | (c1: Long, c2: Long) => c1 + c2)
> rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
> combineB
> yKey at :14
>
> xj @ Tokyo
>
> On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao 
> wrote:
>
>> I followd an example presented in the tutorial Learning Spark
>> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
>> to compute the per-key average as follows:
>>
>>
>> val Array(appName) = args
>> val sparkConf = new SparkConf()
>> .setAppName(appName)
>> val sc = new SparkContext(sparkConf)
>> /*
>>  * compute the per-key average of values
>>  * results should be:
>>  *A : 5.8
>>  *B : 14
>>  *C : 60.6
>>  */
>> val rdd = sc.parallelize(List(
>> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
>> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
>> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
>> val avg = rdd.combineByKey(
>> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
>> cannot be cast to java.lang.Integer
>> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
>> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2))
>> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>>
>>
>>
>> When I submitted the application, an exception of 
>> "*java.lang.ClassCastException:
>> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
>> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
>> => (x, 1)*, should take a single element in the source RDD and return an
>> element of the desired type in the resulting RDD. In my application, we
>> take a single element of type *Int *from the source RDD and return a
>> tuple of type (*Int*, *Int*), which meets the requirements quite well.
>> But why would such an exception be thrown?
>>
>> I'm using CDH 5.0 and Spark 0.9
>>
>> Thanks.
>>
>>
>>
>


combineByKey throws ClassCastException

2014-09-14 Thread Tao Xiao
I followd an example presented in the tutorial Learning Spark

to compute the per-key average as follows:


val Array(appName) = args
val sparkConf = new SparkConf()
.setAppName(appName)
val sc = new SparkContext(sparkConf)
/*
 * compute the per-key average of values
 * results should be:
 *A : 5.8
 *B : 14
 *C : 60.6
 */
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
val avg = rdd.combineByKey(
(x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
cannot be cast to java.lang.Integer
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2))
.map{case (s, t) => (s, t._1/t._2.toFloat)}
 avg.collect.foreach(t => println(t._1 + " ->" + t._2))



When I submitted the application, an exception of
"*java.lang.ClassCastException:
scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown out.
The tutorial said that the first function of *combineByKey*, *(x:Int) =>
(x, 1)*, should take a single element in the source RDD and return an
element of the desired type in the resulting RDD. In my application, we
take a single element of type *Int *from the source RDD and return a tuple
of type (*Int*, *Int*), which meets the requirements quite well. But why
would such an exception be thrown?

I'm using CDH 5.0 and Spark 0.9

Thanks.


Re: What is the appropriate privileges needed for writting files into checkpoint directory?

2014-09-03 Thread Tao Xiao
I found the answer. Here the file system of the checkpoint should be a
fault-tolerant file system like HDFS, so we should set it to a HDFS path.
It is not a local file system path.


2014-09-03 10:28 GMT+08:00 Tao Xiao :

> I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
> application, the checkpoint directory was set as follows :
>
> val sparkConf = new SparkConf().setAppName("KafkaWordCount")
> val ssc =  new StreamingContext(sparkConf, Seconds(2))
> ssc.checkpoint("checkpoint")
>
>
> After submitting my application into the cluster, I could see the correct
> counting results on the console, but the running application kept
> complaining the following:
>
> 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
> java.io.FileNotFoundException
> java.io.FileNotFoundException:
> /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171
> (Permission denied)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:194)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:206)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:202)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
>   at
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:384)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
>   at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
>   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
>   at
> org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at
> org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>   at org.apache.spark.scheduler.Task.run(Task.scala:53)
>   at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>   at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:396)
>   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>   at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>   at java.lang.Thread.run(Thread.java:662)
>
>
> On the node where I submitted the applicaition, the checkpoint directory(
> /usr/games/SparkStreaming/checkpoint) was created and some files was
> created there, but there existed no such directory on other nodes of the
> Spark cluster.
>
> I guess that was because processes on other nodes of the cluster didn't
> have appropriate privileges to create the checkpoint directory. So I
> created that directory on each node manually and changed its mode to 777,
> which means any user can write to that directory. But the SparkStreaming
> application still kept throwing that exception.
>
> So what is the real reason?  Thanks.
>
>
>
>


What is the appropriate privileges needed for writting files into checkpoint directory?

2014-09-02 Thread Tao Xiao
I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
application, the checkpoint directory was set as follows :

val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc =  new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")


After submitting my application into the cluster, I could see the correct
counting results on the console, but the running application kept
complaining the following:

14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171
(Permission denied)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.(FileOutputStream.java:194)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:206)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:202)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
  at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:384)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
  at
org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
  at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:396)
  at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
  at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
  at java.lang.Thread.run(Thread.java:662)


On the node where I submitted the applicaition, the checkpoint directory(
/usr/games/SparkStreaming/checkpoint) was created and some files was
created there, but there existed no such directory on other nodes of the
Spark cluster.

I guess that was because processes on other nodes of the cluster didn't
have appropriate privileges to create the checkpoint directory. So I
created that directory on each node manually and changed its mode to 777,
which means any user can write to that directory. But the SparkStreaming
application still kept throwing that exception.

So what is the real reason?  Thanks.


Re: What does "appMasterRpcPort: -1" indicate ?

2014-08-31 Thread Tao Xiao
Thanks Yi, I think your answers make sense.

We can see a series of messages with "appMasterRpcPort: -1" followed by a
message with "appMasterRpcPort: 0", perhaps that means we were waiting for
the application master to be started ("appMasterRpcPort: -1"), and later
the application master got started ("appMasterRpcPort: 0").


2014-08-31 23:10 GMT+08:00 Yi Tian :

> I think -1 means your application master has not been started yet.
>
>
> 在 2014年8月31日,23:02,Tao Xiao  写道:
>
> I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.
>
> Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to
> submit my job in local mode, Spark Standalone mode and YARN mode. I
> successfully submitted my job in local mode and Standalone mode, however, I
> noticed the following messages printed on console when I submitted my job
> in YARN mode:
>
>
> 14/08/29 22:27:29 INFO Client: Submitting application to ASM
>
> 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
> application_1406949333981_0015
>
> 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: -1
>
>   appStartTime: 1409365649836
>
>   yarnAppState: ACCEPTED
>
>
> 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
> ASM:
>
>   appMasterRpcPort: 0
>
>   appStartTime: 1409365649836
>
>   yarnAppState: RUNNING
>
>
> The job finished successfully and produced correct results.
> But I'm not sure what those messages mean? Does "appMasterRpcPort: -1"
> indicate an error or exception ?
>
>
>


What does "appMasterRpcPort: -1" indicate ?

2014-08-31 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to
submit my job in local mode, Spark Standalone mode and YARN mode. I
successfully submitted my job in local mode and Standalone mode, however, I
noticed the following messages printed on console when I submitted my job
in YARN mode:


14/08/29 22:27:29 INFO Client: Submitting application to ASM

14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015

14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: 0

  appStartTime: 1409365649836

  yarnAppState: RUNNING


The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does "appMasterRpcPort: -1"
indicate an error or exception ?


How can a "deserialized Java object" be stored on disk?

2014-08-30 Thread Tao Xiao
Reading about RDD Persistency
,
I
learned that the storage level "MEMORY_AND_DISK" means that " Store RDD as
deserialized Java objects in the JVM. If the RDD does not fit in memory,
store the partitions that don't fit on disk, and read them from there when
they're needed. "

But how can a "deserialized Java object" be stored on disk? As far as I
know, a Java object should be stored as an array of bytes on disk, which
means that Java object should be firtly converted into an array of bytes (a
serialized object).

Thanks .


Fwd: What does "appMasterRpcPort: -1" indicate ?

2014-08-30 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5

, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

 14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: 0
 appStartTime: 1409365649836
 yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does "appMasterRpcPort: -1" indicate
an error or exception ?

Thanks


What does "appMasterRpcPort: -1" indicate ?

2014-08-29 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5

, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: -1
appStartTime: 1409365649836
yarnAppState: ACCEPTED
14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
appMasterRpcPort: 0
appStartTime: 1409365649836
yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does "appMasterRpcPort: -1" indicate
an error or exception ?

Thanks


How to provide a custom Comparator to sortByKey?

2014-02-28 Thread Tao Xiao
I am using Spark 0.9
I have an array of tuples, and I want to sort these tuples using the *sortByKey
*API as follows in Spark shell:

val A:Array[(String, String)] = Array(("1", "One"), ("9", "Nine"), ("3",
"three"), ("5", "five"), ("4", "four"))
val P = sc.parallelize(A)

// MyComparator is an example, maybe I have more complex implementation
class MyComparator extends java.util.Comparator[String] {
def compare(s1:String, s2:String):Int = {
s1.compareTo(s2)
}
}

val comp = new MyComparator()
P.sortByKey(comp, true)


When I invoked P.sortByKey(comp, true),  spark shell complained that there
was a type mismatch, to be specific, *sortByKey *requires *Boolean *but I
provided a *Comparator*.

How should I provide my custom comparator to *sortByKey *?


Re: Need some tutorials and examples about customized partitioner

2014-02-27 Thread Tao Xiao
Also thanks Matei


2014-02-26 15:19 GMT+08:00 Matei Zaharia :

> Take a look at the "advanced Spark features" talk here too:
> http://ampcamp.berkeley.edu/amp-camp-one-berkeley-2012/.
>
> Matei
>
> On Feb 25, 2014, at 6:22 PM, Tao Xiao  wrote:
>
> Thank you Mayur, I think that will help me a lot
>
>
> Best,
> Tao
>
>
> 2014-02-26 8:56 GMT+08:00 Mayur Rustagi :
>
>> Type of Shuffling is best explained by Matei in Spark Internals .
>> http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
>>  Why dont you look at that & then if you have follow up questions ask
>> here, also would be good to watch this whole talk as it talks about Spark
>> job flows in a lot more detail.
>>
>> SCALA
>> import org.apache.spark.RangePartitioner;
>> var file=sc.textFile("")
>> var partitionedFile=file.map(x=>(x,1))
>> var data= partitionedFile.partitionBy(new
>> RangePartitioner(3, partitionedFile))
>> data.glom().collect()(0).length
>> data.glom().collect()(1).length
>> data.glom().collect()(2).length
>> This will sample the RDD partitionedFile & then try to partition
>> partitionedFile in almost equal sizes.
>> Do not do collect if your data size is huge as this may OOM the driver,
>> write it to disk in that case.
>>
>>
>>
>> Scala
>>
>> Mayur Rustagi
>> Ph: +919632149971
>> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
>> https://twitter.com/mayur_rustagi
>>
>>
>>
>> On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao wrote:
>>
>>> I am a newbie to Spark and I need to know how RDD partitioning can be
>>> controlled in the process of shuffling. I have googled for examples but
>>> haven't found much concrete examples, in contrast with the fact that there
>>> are many good tutorials about Hadoop's shuffling and partitioner.
>>>
>>> Can anybody show me good tutorials explaining the process of shuffling
>>> in Spark, as well as examples of how to use a customized partitioner.?
>>>
>>>
>>> Best,
>>> Tao
>>>
>>
>>
>
>


Re: Need some tutorials and examples about customized partitioner

2014-02-25 Thread Tao Xiao
Thank you Mayur, I think that will help me a lot


Best,
Tao


2014-02-26 8:56 GMT+08:00 Mayur Rustagi :

> Type of Shuffling is best explained by Matei in Spark Internals .
> http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
> Why dont you look at that & then if you have follow up questions ask here,
> also would be good to watch this whole talk as it talks about Spark job
> flows in a lot more detail.
>
> SCALA
> import org.apache.spark.RangePartitioner;
> var file=sc.textFile("")
> var partitionedFile=file.map(x=>(x,1))
> var data= partitionedFile.partitionBy(new
> RangePartitioner(3, partitionedFile))
> data.glom().collect()(0).length
> data.glom().collect()(1).length
> data.glom().collect()(2).length
> This will sample the RDD partitionedFile & then try to partition
> partitionedFile in almost equal sizes.
> Do not do collect if your data size is huge as this may OOM the driver,
> write it to disk in that case.
>
>
>
> Scala
>
> Mayur Rustagi
> Ph: +919632149971
> h <https://twitter.com/mayur_rustagi>ttp://www.sigmoidanalytics.com
> https://twitter.com/mayur_rustagi
>
>
>
> On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao wrote:
>
>> I am a newbie to Spark and I need to know how RDD partitioning can be
>> controlled in the process of shuffling. I have googled for examples but
>> haven't found much concrete examples, in contrast with the fact that there
>> are many good tutorials about Hadoop's shuffling and partitioner.
>>
>> Can anybody show me good tutorials explaining the process of shuffling in
>> Spark, as well as examples of how to use a customized partitioner.?
>>
>>
>> Best,
>> Tao
>>
>
>


Need some tutorials and examples about customized partitioner

2014-02-25 Thread Tao Xiao
I am a newbie to Spark and I need to know how RDD partitioning can be
controlled in the process of shuffling. I have googled for examples but
haven't found much concrete examples, in contrast with the fact that there
are many good tutorials about Hadoop's shuffling and partitioner.

Can anybody show me good tutorials explaining the process of shuffling in
Spark, as well as examples of how to use a customized partitioner.?


Best,
Tao