Re: Array in broadcast can't be serialized

2015-02-16 Thread Tao Xiao
Thanks Ted

After searching for a whole day, I still don't know how to let spark use
twitter chill serialization - there are very few documents about how to
integrate twitter chill into Spark for serialization. I tried the
following, but an exception of java.lang.ClassCastException:
com.twitter.chill.WrappedArraySerializer cannot be cast to
org.apache.spark.serializer.Serializer was thrown:

val conf = new SparkConf()
   .setAppName(Test Serialization)
   .set(spark.serializer,
com.twitter.chill.WrappedArraySerializer)


Well, what is the correct way of configuring Spark to use the twitter chill
serialization framework ?







2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 I was looking at https://github.com/twitter/chill

 It seems this would achieve what you want:
 chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala

 Cheers

 On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
 serialized by Kryo but *Array[ImmutableBytesWritable] *can't be
 serialized even when I registered both of them in Kryo.

 The code is as follows:

val conf = new SparkConf()
 .setAppName(Hello Spark)
 .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
 .set(spark.kryo.registrator, xt.MyKryoRegistrator)

 val sc = new SparkContext(conf)

 val rdd = sc.parallelize(List(
 (new ImmutableBytesWritable(Bytes.toBytes(AAA)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(BBB)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(CCC)),
 new KeyValue()),
 (new ImmutableBytesWritable(Bytes.toBytes(DDD)),
 new KeyValue())), 4)

 // snippet 1:  a single object of *ImmutableBytesWritable* can
 be serialized in broadcast
 val partitioner = new SingleElementPartitioner(sc.broadcast(new
 ImmutableBytesWritable(Bytes.toBytes(3
 val ret = rdd.aggregateByKey(List[KeyValue](),
 partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
 println(\n\n\ret.count =  + ret.count + ,  partition size = 
 + ret.partitions.size)

 // snippet 2: an array of *ImmutableBytesWritable* can not be
 serialized in broadcast
 val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
 ImmutableBytesWritable(Bytes.toBytes(2)), new
 ImmutableBytesWritable(Bytes.toBytes(3)))
 val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
 val ret1 = rdd.aggregateByKey(List[KeyValue](),
 newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
  (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
 println(\n\n\nrdd2.count =  + ret1.count)

 sc.stop


   // the following are kryo registrator and partitioners
class MyKryoRegistrator extends KryoRegistrator {
 override def registerClasses(kryo: Kryo): Unit = {
  kryo.register(classOf[ImmutableBytesWritable])   //
 register ImmutableBytesWritable
  kryo.register(classOf[Array[ImmutableBytesWritable]])
  // register Array[ImmutableBytesWritable]
 }
}

class SingleElementPartitioner(bc:
 Broadcast[ImmutableBytesWritable]) extends Partitioner {
 override def numPartitions: Int = 5
 def v = Bytes.toInt(bc.value.get)
 override def getPartition(key: Any): Int =  v - 1
}


 class ArrayPartitioner(bc:
 Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
 val arr = bc.value
 override def numPartitions: Int = arr.length
 override def getPartition(key: Any): Int =
 Bytes.toInt(arr(0).get)
 }



 In the code above, snippet 1 can work as expected. But snippet 2 throws
 Task not serializable: java.io.NotSerializableException:
 org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


 So do I have to implement a Kryo serializer for Array[T] if it is used in
 broadcast ?

 Thanks








Array in broadcast can't be serialized

2015-02-15 Thread Tao Xiao
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be
serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized
even when I registered both of them in Kryo.

The code is as follows:

   val conf = new SparkConf()
.setAppName(Hello Spark)
.set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryo.registrator, xt.MyKryoRegistrator)

val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(
(new ImmutableBytesWritable(Bytes.toBytes(AAA)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(BBB)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(CCC)), new
KeyValue()),
(new ImmutableBytesWritable(Bytes.toBytes(DDD)), new
KeyValue())), 4)

// snippet 1:  a single object of *ImmutableBytesWritable* can be
serialized in broadcast
val partitioner = new SingleElementPartitioner(sc.broadcast(new
ImmutableBytesWritable(Bytes.toBytes(3
val ret = rdd.aggregateByKey(List[KeyValue](),
partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist()
println(\n\n\ret.count =  + ret.count + ,  partition size =  +
ret.partitions.size)

// snippet 2: an array of *ImmutableBytesWritable* can not be
serialized in broadcast
val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new
ImmutableBytesWritable(Bytes.toBytes(2)), new
ImmutableBytesWritable(Bytes.toBytes(3)))
val newPartitioner = new ArrayPartitioner(sc.broadcast(arr))
val ret1 = rdd.aggregateByKey(List[KeyValue](),
newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs,
 (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys )
println(\n\n\nrdd2.count =  + ret1.count)

sc.stop


  // the following are kryo registrator and partitioners
   class MyKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo): Unit = {
 kryo.register(classOf[ImmutableBytesWritable])   //
register ImmutableBytesWritable
 kryo.register(classOf[Array[ImmutableBytesWritable]])
 // register
Array[ImmutableBytesWritable]
}
   }

   class SingleElementPartitioner(bc:
Broadcast[ImmutableBytesWritable]) extends Partitioner {
override def numPartitions: Int = 5
def v = Bytes.toInt(bc.value.get)
override def getPartition(key: Any): Int =  v - 1
   }


class ArrayPartitioner(bc:
Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner {
val arr = bc.value
override def numPartitions: Int = arr.length
override def getPartition(key: Any): Int =
Bytes.toInt(arr(0).get)
}



In the code above, snippet 1 can work as expected. But snippet 2 throws
Task not serializable: java.io.NotSerializableException:
org.apache.hadoop.hbase.io.ImmutableBytesWritable  .


So do I have to implement a Kryo serializer for Array[T] if it is used in
broadcast ?

Thanks


Can not write out data as snappy-compressed files

2014-12-08 Thread Tao Xiao
I'm using CDH 5.1.0  and Spark 1.0.0, and I'd like to write out data
as snappy-compressed files but encounted a problem.

My code is as follows:

  val InputTextFilePath = hdfs://ec2.hadoop.com:8020/xt/text/new.txt
  val OutputTextFilePath = hdfs://ec2.hadoop.com:8020/xt/compressedText/

  val sparkConf = new SparkConf()
.setAppName(compress data).setMaster(Master)
.set(spark.executor.memory, 8g)
.set(spark.cores.max, 12)
.set(spark.io.compression.codec,
org.apache.spark.io.SnappyCompressionCodec)

  val sc = new SparkContext(sparkConf)

  val rdd = sc.textFile(InputTextFilePath)
  rdd.saveAsTextFile(OutputTextFilePath,
classOf[org.apache.hadoop.io.compress.SnappyCodec])

  sc.stop


When I submitted the job the following exception was thrown:
14/12/08 21:16:15 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/12/08 21:16:21 WARN TaskSetManager: Lost TID 0 (task 0.0:0)
14/12/08 21:16:21 WARN TaskSetManager: Loss was due to
java.lang.UnsatisfiedLinkError
java.lang.UnsatisfiedLinkError:
org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
 at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native
Method)
 at
org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
 at
org.apache.hadoop.io.compress.SnappyCodec.createCompressor(SnappyCodec.java:143)
 at
org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:98)
 at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136)
 at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89)
 at org.apache.spark.rdd.PairRDDFunctions.org
$apache$spark$rdd$PairRDDFunctions$$writeToFile$1(PairRDDFunctions.scala:825)
 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840)
 at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 at org.apache.spark.scheduler.Task.run(Task.scala:51)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

It seems that Spark could not find snappy's native library.
I searched for solutions on the Internet and tried the following ways:

Adding 
spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec to
*/etc/spark/conf.cloudera.spark/spark-defaults.conf*

Adding the following configurations to
*/etc/spark/conf.cloudera.spark/spark-env.sh*
export
JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native
export
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native
export
SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native/libsnappy.so
export
SPARK_CLASSPATH=/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/snappy-java-1.0.4.1.jar

Yet neither works, can anybody tell me what shoud I do to read from and
write Snappy-compressed files in Spark ?

Any answer is appreciated.


A partitionBy problem

2014-11-18 Thread Tao Xiao
Hi all,

 I tested *partitionBy *feature in wordcount application, and I'm
puzzled by a phenomenon. In this application, I created an rdd from some
text files in HDFS(about 100GB in size), each of which has lines composed
of words separated by a character #. I wanted to count the occurence for
each distinct word. *All lines have the same contents so finally the result
should be very small in bytes*.  The code is as follows:

  val text = sc.textFile(inputDir)
  val tuples = text.flatMap(line = line.split(#))
   .map((_, 1))
   .reduceByKey(_ + _)
  tuples.collect.foreach{ case (word, count) = println(word +  -  +
count)}

I submitted the application to a Spark cluster of 5 nodes and run it in
standalone mode. From the application UI
http://imgbin.org/index.php?page=imageid=20976, we can see that the
shuffle process for *collect* and *reduceByKey* occupied small bandwidth
(766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle
write).

*However, the shuffle process occupied quite large bandwith when I
added partitionBy like this:*

  val text = sc.textFile(inputDir)
   val tuples = text.flatMap(line = line.split(#))
.map((_, 1))
.partitionBy(new HashPartitioner(100))
.reduceByKey(_ + _)
  tuples.collect.foreach{ case (word, count) = println(word +  -  +
count)}

From the application UI http://imgbin.org/index.php?page=imageid=20977,
we can see that the shuffle read for *collect* is 2.8GB and the shuffle
write for *map* is 3.5GB.

The *map* transformations are applied on 5 nodes of the cluster because the
HDFS blocks are distributed among these 5 nodes. The *map*
transformations are applied for each element in the rdd on different nodes
and doesn't need shuffle the new rdd. *So my first question is : why did
the map transformation occupy so large bandwidth(3.5GB) when I added
partitionBy in the codes ?*

When *collect* is applied, is needs to collect the results, namely (*word*,
*totalCount*) tuples from 5 nodes to the driver. That process should occupy
very small bandwidth because all lines have the same contents like
AAA#BBB#CCC#DDD, which means the final results the *collect*  retrieved
should be very small in bytes(for example hundreds of KB). *So my second
question is : Why did the collect action occupy so large bandwidth(2.8GB)
when I added partitionByKey in the codes ?*

*And the third question : When I added partitionBy for an rdd, it will
return a new rdd. Does that mean the rdd will be immediately shuffled
across nodes to meet the requirement specified by the supplied partitioner,
or will the supplied partitioner merely be a sign indicating how to
partition the rdd later. *

Thanks.


Re: How to kill a Spark job running in cluster mode ?

2014-11-12 Thread Tao Xiao
Thanks for your replies.

Actually we can kill a driver by the command bin/spark-class
org.apache.spark.deploy.Client kill spark-master driver-id if you know
the driver id.

2014-11-11 22:35 GMT+08:00 Ritesh Kumar Singh riteshoneinamill...@gmail.com
:

 There is a property :
spark.ui.killEnabled
 which needs to be set true for killing applications directly from the
 webUI.
 Check the link:
 Kill Enable spark job
 http://spark.apache.org/docs/latest/configuration.html#spark-ui

 Thanks

 On Tue, Nov 11, 2014 at 7:42 PM, Sonal Goyal sonalgoy...@gmail.com
 wrote:

 The web interface has a kill link. You can try using that.

 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co

 http://in.linkedin.com/in/sonalgoyal



 On Tue, Nov 11, 2014 at 7:28 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I'm using Spark 1.0.0 and I'd like to kill a job running in cluster
 mode, which means the driver is not running on local node.

 So how can I kill such a job? Is there a command like hadoop job -kill
 job-id which kills a running MapReduce job ?

 Thanks






How to kill a Spark job running in cluster mode ?

2014-11-11 Thread Tao Xiao
I'm using Spark 1.0.0 and I'd like to kill a job running in cluster mode,
which means the driver is not running on local node.

So how can I kill such a job? Is there a command like hadoop job -kill
job-id which kills a running MapReduce job ?

Thanks


Re: All executors run on just a few nodes

2014-10-20 Thread Tao Xiao
Raymond,

Thank you.

But I read from other thread
http://apache-spark-user-list.1001560.n3.nabble.com/When-does-Spark-switch-from-PROCESS-LOCAL-to-NODE-LOCAL-or-RACK-LOCAL-td7091.html
that PROCESS_LOCAL means the data is in the same JVM as the code that is
running. When data is in the same JVM with the code that is running, the
data should be on the same node as JVM, i.e., the data can be said to be
local.

Also you said that the tasks will be assigned to available executors which
satisfy the application's requirements. But what requirements must an
executor satisfy so that task can be assigned to it? Do you mean resources
(memory, CPU)?

Finally, is there any way to guarantee that all executors for an
application will be run on all Spark nodes when data to be processed is big
enough (for example, HBase data resides on all RegionServers) ?




2014-10-20 11:35 GMT+08:00 raymond rgbbones.m...@gmail.com:

 My best guess is the speed that your executors got registered with driver
 differs between each run.

 when you run it for the first time, the executors is not fully registered
 when task set manager start to assign tasks, and thus the tasks was
 assigned to available executors which have already satisfy what you need
 ,say 86 with one batch.

 And the “Process_local” does not necessary means that the data is local,
 it could be that the executor is not available yet for the data source ( in
 your case, might though will be available later).

 If this is the case, you could just sleep a few seconds before run the
 job. or there are some patches related and providing other way to sync
 executors status before running applications, but I haven’t track the
 related status for a while.


 Raymond

 On 2014年10月20日, at 上午11:22, Tao Xiao xiaotao.cs@gmail.com wrote:

 Hi all,

 I have a Spark-0.9 cluster, which has 16 nodes.

 I wrote a Spark application to read data from an HBase table, which has 86
 regions spreading over 20 RegionServers.

 I submitted the Spark app in Spark standalone mode and found that there
 were 86 executors running on just 3 nodes and it took about  30 minutes to
 read data from the table. In this case, I noticed from Spark master UI
 that Locality Level of all executors are PROCESS_LOCAL.

 Later I ran the same app again (without any code changed) and found that
 those 86 executors were running on 16 nodes, and this time it took just 4
 minutes to read date from the same HBase table. In this case, I noticed
 that Locality Level of most executors are NODE_LOCAL.

 After testing multiple times, I found the two cases above occur randomly.

 So I have 2 questions:
 1)  Why would the two cases above occur randomly when I submitted the same
 application multiple times ?
 2)  Would the spread of executors influence locality level ?

 Thank you.







All executors run on just a few nodes

2014-10-19 Thread Tao Xiao
Hi all,

I have a Spark-0.9 cluster, which has 16 nodes.

I wrote a Spark application to read data from an HBase table, which has 86
regions spreading over 20 RegionServers.

I submitted the Spark app in Spark standalone mode and found that there
were 86 executors running on just 3 nodes and it took about  30 minutes to
read data from the table. In this case, I noticed from Spark master UI
that Locality
Level of all executors are PROCESS_LOCAL.

Later I ran the same app again (without any code changed) and found that
those 86 executors were running on 16 nodes, and this time it took just 4
minutes to read date from the same HBase table. In this case, I noticed
that Locality Level of most executors are NODE_LOCAL.

After testing multiple times, I found the two cases above occur randomly.

So I have 2 questions:
1)  Why would the two cases above occur randomly when I submitted the same
application multiple times ?
2)  Would the spread of executors influence locality level ?

Thank you.


Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-13 Thread Tao Xiao
Thanks Akhil.  Both ways work for me, but I'd like to know why that
exception was thrown. The class HBaseApp and related class were all
contained in my application jar, why was *com.xt.scala.HBaseApp$$*
*anonfun$testHBase$1* not found ?

2014-10-13 14:53 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com:

 Adding your application jar to the sparkContext will resolve this issue.

 Eg:
 sparkContext.addJar(./target/scala-2.10/myTestApp_2.10-1.0.jar)

 Thanks
 Best Regards

 On Mon, Oct 13, 2014 at 8:42 AM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 In the beginning I tried to read HBase and found that exception was
 thrown, then I start to debug the app. I removed the codes reading HBase
 and tried to save an rdd containing a list and the exception was still
 thrown. So I'm sure that exception was not caused by reading HBase.

 While debugging I did not change the object name and file name.



 2014-10-13 0:00 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 Your app is named scala.HBaseApp
 Does it read / write to HBase ?

 Just curious.

 On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 Hi all,

 I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark
 Standalone Cluster mode.

 The job is quite simple as follows:

   object HBaseApp {
 def main(args:Array[String]) {
 testHBase(student, /test/xt/saveRDD)
 }


 def testHBase(tableName: String, outFile:String) {
   val sparkConf = new SparkConf()
 .setAppName(-- Test HBase --)
 .set(spark.executor.memory, 2g)
 .set(spark.cores.max, 16)

   val sparkContext = new SparkContext(sparkConf)

   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

   val c = rdd.count // successful
   println(\n\n\n  + c + \n\n\n)

   rdd.saveAsTextFile(outFile)  // This line will throw
 java.lang.ClassNotFoundException:
 com.xt.scala.HBaseApp$$anonfun$testHBase$1

   println(\n  down  \n)
 }
 }

 I submitted this job using the following script:

 #!/bin/bash

 HBASE_CLASSPATH=$(hbase classpath)
 APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar

 SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
 SPARK_MASTER=spark://b02.jsepc.com:7077

 CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
 export SPARK_CLASSPATH=/usr/lib/hbase/lib/*

 CONFIG_OPTS=-Dspark.master=$SPARK_MASTER

 java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@

 After I submitted the job, the count of rdd could be computed
 successfully, but that rdd could not be saved into HDFS and the following
 exception was thrown:

 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException
 java.lang.ClassNotFoundException:
 com.xt.scala.HBaseApp$$anonfun$testHBase$1
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
  at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
  at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798

ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-12 Thread Tao Xiao
Hi all,

I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark Standalone
Cluster mode.

The job is quite simple as follows:

  object HBaseApp {
def main(args:Array[String]) {
testHBase(student, /test/xt/saveRDD)
}


def testHBase(tableName: String, outFile:String) {
  val sparkConf = new SparkConf()
.setAppName(-- Test HBase --)
.set(spark.executor.memory, 2g)
.set(spark.cores.max, 16)

  val sparkContext = new SparkContext(sparkConf)

  val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

  val c = rdd.count // successful
  println(\n\n\n  + c + \n\n\n)

  rdd.saveAsTextFile(outFile)  // This line will throw
java.lang.ClassNotFoundException:
com.xt.scala.HBaseApp$$anonfun$testHBase$1

  println(\n  down  \n)
}
}

I submitted this job using the following script:

#!/bin/bash

HBASE_CLASSPATH=$(hbase classpath)
APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar
SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
SPARK_MASTER=spark://b02.jsepc.com:7077

CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
export SPARK_CLASSPATH=/usr/lib/hbase/lib/*

CONFIG_OPTS=-Dspark.master=$SPARK_MASTER

java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@

After I submitted the job, the count of rdd could be computed successfully,
but that rdd could not be saved into HDFS and the following exception was
thrown:

14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
java.lang.ClassNotFoundException
java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
 at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
 at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
 at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
 at

Re: ClasssNotFoundExeception was thrown while trying to save rdd

2014-10-12 Thread Tao Xiao
In the beginning I tried to read HBase and found that exception was thrown,
then I start to debug the app. I removed the codes reading HBase and tried
to save an rdd containing a list and the exception was still thrown. So I'm
sure that exception was not caused by reading HBase.

While debugging I did not change the object name and file name.



2014-10-13 0:00 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 Your app is named scala.HBaseApp
 Does it read / write to HBase ?

 Just curious.

 On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 Hi all,

 I'm using CDH 5.0.1 (Spark 0.9)  and submitting a job in Spark Standalone
 Cluster mode.

 The job is quite simple as follows:

   object HBaseApp {
 def main(args:Array[String]) {
 testHBase(student, /test/xt/saveRDD)
 }


 def testHBase(tableName: String, outFile:String) {
   val sparkConf = new SparkConf()
 .setAppName(-- Test HBase --)
 .set(spark.executor.memory, 2g)
 .set(spark.cores.max, 16)

   val sparkContext = new SparkContext(sparkConf)

   val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)

   val c = rdd.count // successful
   println(\n\n\n  + c + \n\n\n)

   rdd.saveAsTextFile(outFile)  // This line will throw
 java.lang.ClassNotFoundException:
 com.xt.scala.HBaseApp$$anonfun$testHBase$1

   println(\n  down  \n)
 }
 }

 I submitted this job using the following script:

 #!/bin/bash

 HBASE_CLASSPATH=$(hbase classpath)
 APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar

 SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar
 SPARK_MASTER=spark://b02.jsepc.com:7077

 CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH
 export SPARK_CLASSPATH=/usr/lib/hbase/lib/*

 CONFIG_OPTS=-Dspark.master=$SPARK_MASTER

 java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@

 After I submitted the job, the count of rdd could be computed
 successfully, but that rdd could not be saved into HDFS and the following
 exception was thrown:

 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to
 java.lang.ClassNotFoundException
 java.lang.ClassNotFoundException:
 com.xt.scala.HBaseApp$$anonfun$testHBase$1
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
  at java.security.AccessController.doPrivileged(Native Method)
  at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
  at java.lang.Class.forName0(Native Method)
  at java.lang.Class.forName(Class.java:270)
  at
 org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
  at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
  at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:606)
  at
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
  at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
  at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
  at
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40

Re: Reading from HBase is too slow

2014-10-08 Thread Tao Xiao
Hi Sean,

   Do I need to specify the number of executors when submitting the job?  I
suppose the number of executors will be determined by the number of regions
of the table. Just like a MapReduce job, you needn't specify the number of
map tasks when reading from a HBase table.

  The script to submit my job can be seen in my second post. Please refer
to that.



2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com:

 How did you run your program? I don't see from your earlier post that
 you ever asked for more executors.

 On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote:
  I found the reason why reading HBase is too slow.  Although each
  regionserver serves multiple regions for the table I'm reading, the
 number
  of Spark workers allocated by Yarn is too low. Actually, I could see that
  the table has dozens of regions spread over about 20 regionservers, but
 only
  two Spark workers are allocated by Yarn. What is worse, the two workers
 run
  one after one. So, the Spark job lost parallelism.
 
  So now the question is : Why are only 2 workers allocated?



Re: Reading from HBase is too slow

2014-10-08 Thread Tao Xiao
Sean,

I did specify the number of cores to use as follows:

... ...
val sparkConf = new SparkConf()
.setAppName( Reading HBase )
.set(spark.cores.max, 32)
val sc = new SparkContext(sparkConf)
... ...



But that does not solve the problem --- only 2 workers are allocated.

I'm using Spark 0.9 and submitting my job through Yarn client mode.
Actually, setting *spark.cores.max* only applies when the job runs on
a *standalone
deploy cluster *or a  *Mesos cluster in coarse-grained sharing mode*.
Please refer to this link
http://spark.apache.org/docs/0.9.1/configuration.html

So how to specify the number of executors when submitting a Spark 0.9 job
in Yarn Client mode?

2014-10-08 15:09 GMT+08:00 Sean Owen so...@cloudera.com:

 You do need to specify the number of executor cores to use. Executors are
 not like mappers. After all they may do much more in their lifetime than
 just read splits from HBase so would not make sense to determine it by
 something that the first line of the program does.
 On Oct 8, 2014 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote:

 Hi Sean,

Do I need to specify the number of executors when submitting the job?
 I suppose the number of executors will be determined by the number of
 regions of the table. Just like a MapReduce job, you needn't specify the
 number of map tasks when reading from a HBase table.

   The script to submit my job can be seen in my second post. Please refer
 to that.



 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com:

 How did you run your program? I don't see from your earlier post that
 you ever asked for more executors.

 On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com
 wrote:
  I found the reason why reading HBase is too slow.  Although each
  regionserver serves multiple regions for the table I'm reading, the
 number
  of Spark workers allocated by Yarn is too low. Actually, I could see
 that
  the table has dozens of regions spread over about 20 regionservers,
 but only
  two Spark workers are allocated by Yarn. What is worse, the two
 workers run
  one after one. So, the Spark job lost parallelism.
 
  So now the question is : Why are only 2 workers allocated?





Re: Reading from HBase is too slow

2014-10-07 Thread Tao Xiao
.



2014-10-02 0:58 GMT+08:00 Vladimir Rodionov vrodio...@splicemachine.com:

 Yes, its in 0.98. CDH is free (w/o subscription) and sometimes its worth
 upgrading to the latest version (which is 0.98 based).

 -Vladimir Rodionov

 On Wed, Oct 1, 2014 at 9:52 AM, Ted Yu yuzhih...@gmail.com wrote:

 As far as I know, that feature is not in CDH 5.0.0

 FYI

 On Wed, Oct 1, 2014 at 9:34 AM, Vladimir Rodionov 
 vrodio...@splicemachine.com wrote:

 Using TableInputFormat is not the fastest way of reading data from
 HBase. Do not expect 100s of Mb per sec. You probably should take a look at
 M/R over HBase snapshots.

 https://issues.apache.org/jira/browse/HBASE-8369

 -Vladimir Rodionov

 On Wed, Oct 1, 2014 at 8:17 AM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I can submit a MapReduce job reading that table, although its
 processing rate is also a litter slower than I expected, but not that slow
 as Spark.







Re: Reading from HBase is too slow

2014-10-01 Thread Tao Xiao
I can submit a MapReduce job reading that table, although its processing
rate is also a litter slower than I expected, but not that slow as Spark.

2014-10-01 12:04 GMT+08:00 Ted Yu yuzhih...@gmail.com:

 Can you launch a job which exercises TableInputFormat on the same table
 without using Spark ?

 This would show whether the slowdown is in HBase code or somewhere else.

 Cheers

 On Mon, Sep 29, 2014 at 11:40 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I checked HBase UI. Well, this table is not completely evenly spread
 across the nodes, but I think to some extent it can be seen as nearly
 evenly spread - at least there is not a single node which has too many
 regions.  Here is a screenshot of HBase UI
 http://imgbin.org/index.php?page=imageid=19539.

 Besides, I checked the size of each region in bytes for this table in the
 HBase shell as follows:


 -bash-4.1$ hadoop dfs -du -h /hbase/data/default/C_CONS
 DEPRECATED: Use of this script to execute hdfs command is deprecated.
 Instead use the hdfs command for it.

 288  /hbase/data/default/C_CONS/.tabledesc
 0/hbase/data/default/C_CONS/.tmp
 159.6 M  /hbase/data/default/C_CONS/0008c2494a5399d68495d9c8ae147821
 76.7 M   /hbase/data/default/C_CONS/021d7d21d7faeb7b2a77835d6f86747e
 81.3 M   /hbase/data/default/C_CONS/02a39a316ac6d2bda89e72e74aa18a6e
 155.3 M  /hbase/data/default/C_CONS/02fe51bc077290febc85651d8ee31abc
 173.4 M  /hbase/data/default/C_CONS/045859bcc70e36eb4d33f8ca3b7d9633
 82.6 M   /hbase/data/default/C_CONS/05c868b6036cc4f1836f70be6215c851
 74.1 M   /hbase/data/default/C_CONS/0816378c837f1f3b84f4d4060d22beb3
 84.7 M   /hbase/data/default/C_CONS/083da8f5eb8a5b1cca76376449f357ca
 346.6 M  /hbase/data/default/C_CONS/0ac70fcb1baea0896ea069a6bcc30898
 333.8 M  /hbase/data/default/C_CONS/0b3be845bd4f5e958e8c9a18c8eaab21
 72.7 M   /hbase/data/default/C_CONS/12c13610c50dbc8ab27f20b0ebf2bfc4
 76.1 M   /hbase/data/default/C_CONS/1341966315d7e53be719d948d595bee0
 72.4 M   /hbase/data/default/C_CONS/1acdbc05c502b11da4852a1f21228f44
 70.0 M   /hbase/data/default/C_CONS/1b8f57d65f6c0e4de721e4c8f1944829
 183.9 M  /hbase/data/default/C_CONS/1f1ae7ca9f725fcf9639a4d52086fa50
 65.5 M   /hbase/data/default/C_CONS/20c10b96e2b9c40684aaeb6d0cfbf7c0
 76.0 M   /hbase/data/default/C_CONS/22515194fe09adcd4cbb2f5307303c73
 78.4 M   /hbase/data/default/C_CONS/236cd80393cb5b7c526bd2c45ce53a0a
 150.0 M  /hbase/data/default/C_CONS/23bd80852f47b97b4122709ec844d4ed
 81.6 M   /hbase/data/default/C_CONS/241b8bc415029dedf94c4a84e6c4ad3b
 77.9 M   /hbase/data/default/C_CONS/27f1e59bde75ef3096a5bdd3eb402cd7
 160.8 M  /hbase/data/default/C_CONS/30c2ae3be38b8cdf3b337054a7d61478
 372.2 M  /hbase/data/default/C_CONS/31d606da71b35844d0cdc8a195c97d2e
 182.6 M  /hbase/data/default/C_CONS/3274a022bc7419d426cf63caa1cc88e1
 92.1 M   /hbase/data/default/C_CONS/344faae7971d87b51edf23f75a7c3746
 154.7 M  /hbase/data/default/C_CONS/3b3f0c839bdb32ed2104f67c8a02da41
 77.4 M   /hbase/data/default/C_CONS/3cf6b2bd0cfe85f3111d0ba1b84a60b4
 71.5 M   /hbase/data/default/C_CONS/3f466db078d07e2bfb11c681e0e3
 77.8 M   /hbase/data/default/C_CONS/3f8c1b7dec05118eb9894bb591e32b2f
 83.6 M   /hbase/data/default/C_CONS/45e105856fcb54748c48bd45e973a3b9
 185.2 M  /hbase/data/default/C_CONS/4becd90d46a2d4a6bd8ecbe02b60892c
 165.6 M  /hbase/data/default/C_CONS/4dcebd58c7013062c4a8583012a11b5a
 67.3 M   /hbase/data/default/C_CONS/51f845d842605dda66b1ae01ad8a17e8
 148.2 M  /hbase/data/default/C_CONS/532189155ab78dbd1e36aac3ab4878a8
 172.6 M  /hbase/data/default/C_CONS/5401d9cb19adb9bd78718ea047e6d9d7
 139.4 M  /hbase/data/default/C_CONS/547d2a8c54aae73e8f12b4570efd984c
 89.5 M   /hbase/data/default/C_CONS/54cbac1f71c7781697052bb2aa1c5a18
 101.3 M  /hbase/data/default/C_CONS/55263ce293327683b9c6e6098ec3e89a
 85.2 M   /hbase/data/default/C_CONS/55f8c278e35de6bca5083c7a66e355fb
 85.8 M   /hbase/data/default/C_CONS/57112558912e1de016327e115bc84f11
 171.8 M  /hbase/data/default/C_CONS/572b886cbfe92ddcb97502f041953fb8
 51   /hbase/data/default/C_CONS/6bd64d8cf6b38806731f7693bdd673c9
 86.6 M   /hbase/data/default/C_CONS/7695703b7b527afc5f3524eee9b5d806
 74.8 M   /hbase/data/default/C_CONS/7bb7567685f5e16a4379d7cf79de2ecc
 120.1 M  /hbase/data/default/C_CONS/7c144bef991bb3c959d7ef6e2fa5036a
 166.0 M  /hbase/data/default/C_CONS/7c7817eb3e531d5bda88b5f0de6a20de
 173.5 M  /hbase/data/default/C_CONS/7d07c139575d007ecbb23fa946e39130
 139.2 M  /hbase/data/default/C_CONS/8295aa701110ddf4055e8c3ca5bd9cad
 91.7 M   /hbase/data/default/C_CONS/84b340d22471580ed8100d6614668eb1
 81.2 M   /hbase/data/default/C_CONS/8605f4470498a01a5ec4c88e7ea8a458
 78.3 M   /hbase/data/default/C_CONS/897da8e33275b80926ef38200132f819
 234.4 M  /hbase/data/default/C_CONS/93f5ce30ed8e54cc282cb5b88fa28d76
 126.3 M  /hbase/data/default/C_CONS/96dd1decd62e35c394bb8e7f6095f054
 80.9 M   /hbase/data/default/C_CONS/998364405e57a7eedae094bca76a419e
 184.8 M  /hbase/data/default/C_CONS/9df3b62b1bff59b67b75ad86d694b8c8
 126.6 M  /hbase

Re: Reading from HBase is too slow

2014-09-30 Thread Tao Xiao
/b1ae0451f592b28eed8a58908f91293a
91.5 M   /hbase/data/default/C_CONS/b8396049e2b742108add1485c0eb4aeb
81.2 M   /hbase/data/default/C_CONS/b8d25b3e536b4fea5ee4ee2b21885c76
87.8 M   /hbase/data/default/C_CONS/bbfbe319705df23a23a89b40e52d89a8
81.3 M   /hbase/data/default/C_CONS/bccaeedc65d9295289f78aaec588cc3d
95.8 M   /hbase/data/default/C_CONS/c229d583958802571dfaa9a39453df0d
88.5 M   /hbase/data/default/C_CONS/c9d7a038243d1b3e2448a48007f1f9e0
158.8 M  /hbase/data/default/C_CONS/cca1bf1f013724af25d71ad4310e5d4a
212.8 M  /hbase/data/default/C_CONS/ccabf798734aa8e05798c43c132ad565
85.1 M   /hbase/data/default/C_CONS/d1cb54346e109b1ba76fd95aa4540161
84.4 M   /hbase/data/default/C_CONS/d4dd8c3fa81b751892689cc92a96aa99
139.5 M  /hbase/data/default/C_CONS/dc15ceeed21474b51086f3103cbd0074
97.7 M   /hbase/data/default/C_CONS/df20e2077f22e83ecd8e0d52dea1
221.0 M  /hbase/data/default/C_CONS/e30d0d55e0887a676c8b79e03771ad23
75.7 M   /hbase/data/default/C_CONS/e6ed24ce0b3e1e903bd9757d28380f3a
74.9 M   /hbase/data/default/C_CONS/e9732d9905f5373fb0fd7a1ce033e17b
101.2 M  /hbase/data/default/C_CONS/f2a49dbaf018f0e45bbd7a758f123418
172.6 M  /hbase/data/default/C_CONS/f34645de36d3c1413ce83177e2118947
89.2 M   /hbase/data/default/C_CONS/f3db2bf3b7ffb7b4c0029eac5d631bdb
81.6 M   /hbase/data/default/C_CONS/f43b49c4f384853266e9ee45a98104a6
68.9 M   /hbase/data/default/C_CONS/fa4fb0047ec98fb10bf84fd72937f415
86.7 M   /hbase/data/default/C_CONS/fc69f349655676e046c9110550825f5a
155.0 M  /hbase/data/default/C_CONS/feb0835bdf73c257de11c65f18b1330d
75.2 M   /hbase/data/default/C_CONS/fff9fbe56af8b9e0e00826f8936e7a56



From the result above we can see that the biggest region's size is 346.6 M,
while most other regions' size are near each other.

So what may be the real reason ?

2014-09-30 12:17 GMT+08:00 Vladimir Rodionov vrodio...@splicemachine.com:

 HBase TableInputFormat creates input splits one per each region. You can
 not achieve high level of parallelism unless you have 5-10 regions per RS
 at least. What does it mean? You probably have too few regions. You can
 verify that in HBase Web UI.

 -Vladimir Rodionov

 On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I submitted a job in Yarn-Client mode, which simply reads from a HBase
 table containing tens of millions of records and then does a *count *action.
 The job runs for a much longer time than I expected, so I wonder whether it
 was because the data to read was too much. Actually, there are 20 nodes in
 my Hadoop cluster so the HBase table seems not so big (tens of millopns of
 records). :

 I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).

 BTW, when the job was running, I can see logs on the console, and
 specifically I'd like to know what the following log means:

 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
 TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20
 as 13454 bytes in 0 ms
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
 ms on b04.jsepc.com (progress: 18/86)
 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)


 Thanks





Reading from HBase is too slow

2014-09-29 Thread Tao Xiao
I submitted a job in Yarn-Client mode, which simply reads from a HBase
table containing tens of millions of records and then does a *count *action.
The job runs for a much longer time than I expected, so I wonder whether it
was because the data to read was too much. Actually, there are 20 nodes in
my Hadoop cluster so the HBase table seems not so big (tens of millopns of
records). :

I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).

BTW, when the job was running, I can see logs on the console, and
specifically I'd like to know what the following log means:

14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as
13454 bytes in 0 ms
14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
ms on b04.jsepc.com (progress: 18/86)
14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)


Thanks


Re: Reading from HBase is too slow

2014-09-29 Thread Tao Xiao
I submitted the job in Yarn-Client mode using the following script:

export
SPARK_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar

export HADOOP_CLASSPATH=$(hbase classpath)
export
CLASSPATH=$CLASSPATH:/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar:/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hadoop-common-2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar:/usr/games/spark/xt/htrace-core-2.01.jar:$HADOOP_CLASSPATH

CONFIG_OPTS=-Dspark.master=yarn-client
-Dspark.jars=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar,/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar,/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar,/usr/games/spark/xt/htrace-core-2.01.jar

java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.TestSpark




My job's code is as follows:


object TestSpark {
  def main(args: Array[String]) {
readHBase(C_CONS)
  }

  def readHBase(tableName: String) {
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)

val sparkConf = new SparkConf()
.setAppName( Reading HBase )
val sc = new SparkContext(sparkConf)

val rdd = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
   classOf[ImmutableBytesWritable], classOf[Result])

println(rdd.count)

  }
}


2014-09-30 10:21 GMT+08:00 Tao Xiao xiaotao.cs@gmail.com:

 I submitted a job in Yarn-Client mode, which simply reads from a HBase
 table containing tens of millions of records and then does a *count *action.
 The job runs for a much longer time than I expected, so I wonder whether it
 was because the data to read was too much. Actually, there are 20 nodes in
 my Hadoop cluster so the HBase table seems not so big (tens of millopns of
 records). :

 I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96).

 BTW, when the job was running, I can see logs on the console, and
 specifically I'd like to know what the following log means:

 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as
 TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL)
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as
 13454 bytes in 0 ms
 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426
 ms on b04.jsepc.com (progress: 18/86)
 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19)


 Thanks



How to sort rdd filled with existing data structures?

2014-09-24 Thread Tao Xiao
Hi ,

I have the following rdd :

  val conf = new SparkConf()
  .setAppName( Testing Sorting )
  val sc = new SparkContext(conf)

  val L = List(
  (new Student(XiaoTao, 80, 29), I'm Xiaotao),
  (new Student(CCC, 100, 24), I'm CCC),
  (new Student(Jack, 90, 25), I'm Jack),
  (new Student(Tom, 60, 35), I'm Tom),
  (new Student(Lucy, 78, 22), I'm Lucy))

  val rdd = sc.parallelize(L, 3)


where Student is a class defined as follows:

class Student(val name:String, val score:Int, val age:Int)  {

 override def toString =
 name: + name + , score: + score + , age: + age

}



I want to sort the *rdd *by key, but when I wrote rdd.sortByKey it
complained that No implicit Ordering defined, which means I must extend
the class with *Ordered *and provide a method named  *compare*.  The
problem is that the class Student is from a third-party library so I cannot
change its definition. I'd like to know if there is a sorting method that I
can provide it a customized compare function so that it can sort the rdd
according to the sorting function I provide.

One more question, if I want to sort RDD[(k, v)] by value , do I have to
map that rdd so that its key and value exchange their positions in the
tuple? Are there any functions that allow us to sort rdd by things other
than key ?

Thanks


Re: combineByKey throws ClassCastException

2014-09-16 Thread Tao Xiao
This problem was caused by the fact that I used a package jar with a Spark
version (0.9.1) different from that of the cluster (0.9.0). When I used the
correct package jar
(spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the
application can run as expected.



2014-09-15 14:57 GMT+08:00 x wasedax...@gmail.com:

 How about this.

 scala val rdd2 = rdd.combineByKey(
  | (v: Int) = v.toLong,
  | (c: Long, v: Int) = c + v,
  | (c1: Long, c2: Long) = c1 + c2)
 rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
 combineB
 yKey at console:14

 xj @ Tokyo

 On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com
 wrote:

 I followd an example presented in the tutorial Learning Spark
 http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
 to compute the per-key average as follows:


 val Array(appName) = args
 val sparkConf = new SparkConf()
 .setAppName(appName)
 val sc = new SparkContext(sparkConf)
 /*
  * compute the per-key average of values
  * results should be:
  *A : 5.8
  *B : 14
  *C : 60.6
  */
 val rdd = sc.parallelize(List(
 (A, 3), (A, 9), (A, 12), (A, 0), (A, 5),
 (B, 4), (B, 10), (B, 11), (B, 20), (B, 25),
 (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2)
 val avg = rdd.combineByKey(
 (x:Int) = (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
 cannot be cast to java.lang.Integer
 (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1),
 (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 +
 acc2._2))
 .map{case (s, t) = (s, t._1/t._2.toFloat)}
  avg.collect.foreach(t = println(t._1 +  - + t._2))



 When I submitted the application, an exception of 
 *java.lang.ClassCastException:
 scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown
 out. The tutorial said that the first function of *combineByKey*, *(x:Int)
 = (x, 1)*, should take a single element in the source RDD and return an
 element of the desired type in the resulting RDD. In my application, we
 take a single element of type *Int *from the source RDD and return a
 tuple of type (*Int*, *Int*), which meets the requirements quite well.
 But why would such an exception be thrown?

 I'm using CDH 5.0 and Spark 0.9

 Thanks.






Re: What is the appropriate privileges needed for writting files into checkpoint directory?

2014-09-03 Thread Tao Xiao
I found the answer. Here the file system of the checkpoint should be a
fault-tolerant file system like HDFS, so we should set it to a HDFS path.
It is not a local file system path.


2014-09-03 10:28 GMT+08:00 Tao Xiao xiaotao.cs@gmail.com:

 I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
 application, the checkpoint directory was set as follows :

 val sparkConf = new SparkConf().setAppName(KafkaWordCount)
 val ssc =  new StreamingContext(sparkConf, Seconds(2))
 ssc.checkpoint(checkpoint)


 After submitting my application into the cluster, I could see the correct
 counting results on the console, but the running application kept
 complaining the following:

 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
 java.io.FileNotFoundException
 java.io.FileNotFoundException:
 /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171
 (Permission denied)
   at java.io.FileOutputStream.open(Native Method)
   at java.io.FileOutputStream.init(FileOutputStream.java:194)
   at
 org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206)
   at
 org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202)
   at
 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
   at
 org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
   at
 org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384)
   at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
   at
 org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
   at
 org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
   at
 org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
   at
 org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
   at org.apache.spark.scheduler.Task.run(Task.scala:53)
   at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
   at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
   at
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
   at java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:396)
   at
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
   at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
   at
 java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
   at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
   at java.lang.Thread.run(Thread.java:662)


 On the node where I submitted the applicaition, the checkpoint directory(
 /usr/games/SparkStreaming/checkpoint) was created and some files was
 created there, but there existed no such directory on other nodes of the
 Spark cluster.

 I guess that was because processes on other nodes of the cluster didn't
 have appropriate privileges to create the checkpoint directory. So I
 created that directory on each node manually and changed its mode to 777,
 which means any user can write to that directory. But the SparkStreaming
 application still kept throwing that exception.

 So what is the real reason?  Thanks.






What is the appropriate privileges needed for writting files into checkpoint directory?

2014-09-02 Thread Tao Xiao
I tried to run  KafkaWordCount in a Spark standalone cluster.  In this
application, the checkpoint directory was set as follows :

val sparkConf = new SparkConf().setAppName(KafkaWordCount)
val ssc =  new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpoint)


After submitting my application into the cluster, I could see the correct
counting results on the console, but the running application kept
complaining the following:

14/09/03 10:01:22 WARN TaskSetManager: Loss was due to
java.io.FileNotFoundException
java.io.FileNotFoundException:
/usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171
(Permission denied)
  at java.io.FileOutputStream.open(Native Method)
  at java.io.FileOutputStream.init(FileOutputStream.java:194)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206)
  at
org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265)
  at
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
  at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443)
  at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
  at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849)
  at
org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at
org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
  at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
  at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:396)
  at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
  at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
  at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
  at java.lang.Thread.run(Thread.java:662)


On the node where I submitted the applicaition, the checkpoint directory(
/usr/games/SparkStreaming/checkpoint) was created and some files was
created there, but there existed no such directory on other nodes of the
Spark cluster.

I guess that was because processes on other nodes of the cluster didn't
have appropriate privileges to create the checkpoint directory. So I
created that directory on each node manually and changed its mode to 777,
which means any user can write to that directory. But the SparkStreaming
application still kept throwing that exception.

So what is the real reason?  Thanks.


What does appMasterRpcPort: -1 indicate ?

2014-08-31 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to
submit my job in local mode, Spark Standalone mode and YARN mode. I
successfully submitted my job in local mode and Standalone mode, however, I
noticed the following messages printed on console when I submitted my job
in YARN mode:


14/08/29 22:27:29 INFO Client: Submitting application to ASM

14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015

14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: -1

  appStartTime: 1409365649836

  yarnAppState: ACCEPTED


14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:

  appMasterRpcPort: 0

  appStartTime: 1409365649836

  yarnAppState: RUNNING


The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does appMasterRpcPort: -1
indicate an error or exception ?


Re: What does appMasterRpcPort: -1 indicate ?

2014-08-31 Thread Tao Xiao
Thanks Yi, I think your answers make sense.

We can see a series of messages with appMasterRpcPort: -1 followed by a
message with appMasterRpcPort: 0, perhaps that means we were waiting for
the application master to be started (appMasterRpcPort: -1), and later
the application master got started (appMasterRpcPort: 0).


2014-08-31 23:10 GMT+08:00 Yi Tian tianyi.asiai...@gmail.com:

 I think -1 means your application master has not been started yet.


 在 2014年8月31日,23:02,Tao Xiao xiaotao.cs@gmail.com 写道:

 I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

 Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to
 submit my job in local mode, Spark Standalone mode and YARN mode. I
 successfully submitted my job in local mode and Standalone mode, however, I
 noticed the following messages printed on console when I submitted my job
 in YARN mode:


 14/08/29 22:27:29 INFO Client: Submitting application to ASM

 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
 application_1406949333981_0015

 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: -1

   appStartTime: 1409365649836

   yarnAppState: ACCEPTED


 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
 ASM:

   appMasterRpcPort: 0

   appStartTime: 1409365649836

   yarnAppState: RUNNING


 The job finished successfully and produced correct results.
 But I'm not sure what those messages mean? Does appMasterRpcPort: -1
 indicate an error or exception ?





Fwd: What does appMasterRpcPort: -1 indicate ?

2014-08-30 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5
http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/
, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

 14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: 0
 appStartTime: 1409365649836
 yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate
an error or exception ?

Thanks


How can a deserialized Java object be stored on disk?

2014-08-30 Thread Tao Xiao
Reading about RDD Persistency
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence,
I
learned that the storage level MEMORY_AND_DISK means that  Store RDD as
deserialized Java objects in the JVM. If the RDD does not fit in memory,
store the partitions that don't fit on disk, and read them from there when
they're needed. 

But how can a deserialized Java object be stored on disk? As far as I
know, a Java object should be stored as an array of bytes on disk, which
means that Java object should be firtly converted into an array of bytes (a
serialized object).

Thanks .


How to provide a custom Comparator to sortByKey?

2014-02-28 Thread Tao Xiao
I am using Spark 0.9
I have an array of tuples, and I want to sort these tuples using the *sortByKey
*API as follows in Spark shell:

val A:Array[(String, String)] = Array((1, One), (9, Nine), (3,
three), (5, five), (4, four))
val P = sc.parallelize(A)

// MyComparator is an example, maybe I have more complex implementation
class MyComparator extends java.util.Comparator[String] {
def compare(s1:String, s2:String):Int = {
s1.compareTo(s2)
}
}

val comp = new MyComparator()
P.sortByKey(comp, true)


When I invoked P.sortByKey(comp, true),  spark shell complained that there
was a type mismatch, to be specific, *sortByKey *requires *Boolean *but I
provided a *Comparator*.

How should I provide my custom comparator to *sortByKey *?


Re: Need some tutorials and examples about customized partitioner

2014-02-25 Thread Tao Xiao
Thank you Mayur, I think that will help me a lot


Best,
Tao


2014-02-26 8:56 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com:

 Type of Shuffling is best explained by Matei in Spark Internals .
 http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203
 Why dont you look at that  then if you have follow up questions ask here,
 also would be good to watch this whole talk as it talks about Spark job
 flows in a lot more detail.

 SCALA
 import org.apache.spark.RangePartitioner;
 var file=sc.textFile(my local path)
 var partitionedFile=file.map(x=(x,1))
 var data= partitionedFile.partitionBy(new
 RangePartitioner(3, partitionedFile))
 data.glom().collect()(0).length
 data.glom().collect()(1).length
 data.glom().collect()(2).length
 This will sample the RDD partitionedFile  then try to partition
 partitionedFile in almost equal sizes.
 Do not do collect if your data size is huge as this may OOM the driver,
 write it to disk in that case.



 Scala

 Mayur Rustagi
 Ph: +919632149971
 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com
 https://twitter.com/mayur_rustagi



 On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao xiaotao.cs@gmail.comwrote:

 I am a newbie to Spark and I need to know how RDD partitioning can be
 controlled in the process of shuffling. I have googled for examples but
 haven't found much concrete examples, in contrast with the fact that there
 are many good tutorials about Hadoop's shuffling and partitioner.

 Can anybody show me good tutorials explaining the process of shuffling in
 Spark, as well as examples of how to use a customized partitioner.?


 Best,
 Tao