Re: Array in broadcast can't be serialized
Thanks Ted After searching for a whole day, I still don't know how to let spark use twitter chill serialization - there are very few documents about how to integrate twitter chill into Spark for serialization. I tried the following, but an exception of java.lang.ClassCastException: com.twitter.chill.WrappedArraySerializer cannot be cast to org.apache.spark.serializer.Serializer was thrown: val conf = new SparkConf() .setAppName(Test Serialization) .set(spark.serializer, com.twitter.chill.WrappedArraySerializer) Well, what is the correct way of configuring Spark to use the twitter chill serialization framework ? 2015-02-15 22:23 GMT+08:00 Ted Yu yuzhih...@gmail.com: I was looking at https://github.com/twitter/chill It seems this would achieve what you want: chill-scala/src/main/scala/com/twitter/chill/WrappedArraySerializer.scala Cheers On Sat, Feb 14, 2015 at 6:36 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Array in broadcast can't be serialized
I'm using Spark 1.1.0 and find that *ImmutableBytesWritable* can be serialized by Kryo but *Array[ImmutableBytesWritable] *can't be serialized even when I registered both of them in Kryo. The code is as follows: val conf = new SparkConf() .setAppName(Hello Spark) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, xt.MyKryoRegistrator) val sc = new SparkContext(conf) val rdd = sc.parallelize(List( (new ImmutableBytesWritable(Bytes.toBytes(AAA)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(BBB)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(CCC)), new KeyValue()), (new ImmutableBytesWritable(Bytes.toBytes(DDD)), new KeyValue())), 4) // snippet 1: a single object of *ImmutableBytesWritable* can be serialized in broadcast val partitioner = new SingleElementPartitioner(sc.broadcast(new ImmutableBytesWritable(Bytes.toBytes(3 val ret = rdd.aggregateByKey(List[KeyValue](), partitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ).persist() println(\n\n\ret.count = + ret.count + , partition size = + ret.partitions.size) // snippet 2: an array of *ImmutableBytesWritable* can not be serialized in broadcast val arr = Array(new ImmutableBytesWritable(Bytes.toBytes(1)), new ImmutableBytesWritable(Bytes.toBytes(2)), new ImmutableBytesWritable(Bytes.toBytes(3))) val newPartitioner = new ArrayPartitioner(sc.broadcast(arr)) val ret1 = rdd.aggregateByKey(List[KeyValue](), newPartitioner)((xs:List[KeyValue], y:KeyValue) = y::xs, (xs:List[KeyValue], ys:List[KeyValue]) = xs:::ys ) println(\n\n\nrdd2.count = + ret1.count) sc.stop // the following are kryo registrator and partitioners class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { kryo.register(classOf[ImmutableBytesWritable]) // register ImmutableBytesWritable kryo.register(classOf[Array[ImmutableBytesWritable]]) // register Array[ImmutableBytesWritable] } } class SingleElementPartitioner(bc: Broadcast[ImmutableBytesWritable]) extends Partitioner { override def numPartitions: Int = 5 def v = Bytes.toInt(bc.value.get) override def getPartition(key: Any): Int = v - 1 } class ArrayPartitioner(bc: Broadcast[Array[ImmutableBytesWritable]]) extends Partitioner { val arr = bc.value override def numPartitions: Int = arr.length override def getPartition(key: Any): Int = Bytes.toInt(arr(0).get) } In the code above, snippet 1 can work as expected. But snippet 2 throws Task not serializable: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable . So do I have to implement a Kryo serializer for Array[T] if it is used in broadcast ? Thanks
Can not write out data as snappy-compressed files
I'm using CDH 5.1.0 and Spark 1.0.0, and I'd like to write out data as snappy-compressed files but encounted a problem. My code is as follows: val InputTextFilePath = hdfs://ec2.hadoop.com:8020/xt/text/new.txt val OutputTextFilePath = hdfs://ec2.hadoop.com:8020/xt/compressedText/ val sparkConf = new SparkConf() .setAppName(compress data).setMaster(Master) .set(spark.executor.memory, 8g) .set(spark.cores.max, 12) .set(spark.io.compression.codec, org.apache.spark.io.SnappyCompressionCodec) val sc = new SparkContext(sparkConf) val rdd = sc.textFile(InputTextFilePath) rdd.saveAsTextFile(OutputTextFilePath, classOf[org.apache.hadoop.io.compress.SnappyCodec]) sc.stop When I submitted the job the following exception was thrown: 14/12/08 21:16:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/12/08 21:16:21 WARN TaskSetManager: Lost TID 0 (task 0.0:0) 14/12/08 21:16:21 WARN TaskSetManager: Loss was due to java.lang.UnsatisfiedLinkError java.lang.UnsatisfiedLinkError: org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method) at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) at org.apache.hadoop.io.compress.SnappyCodec.createCompressor(SnappyCodec.java:143) at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:98) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:89) at org.apache.spark.rdd.PairRDDFunctions.org $apache$spark$rdd$PairRDDFunctions$$writeToFile$1(PairRDDFunctions.scala:825) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:840) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) It seems that Spark could not find snappy's native library. I searched for solutions on the Internet and tried the following ways: Adding spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec to */etc/spark/conf.cloudera.spark/spark-defaults.conf* Adding the following configurations to */etc/spark/conf.cloudera.spark/spark-env.sh* export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native/libsnappy.so export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/snappy-java-1.0.4.1.jar Yet neither works, can anybody tell me what shoud I do to read from and write Snappy-compressed files in Spark ? Any answer is appreciated.
A partitionBy problem
Hi all, I tested *partitionBy *feature in wordcount application, and I'm puzzled by a phenomenon. In this application, I created an rdd from some text files in HDFS(about 100GB in size), each of which has lines composed of words separated by a character #. I wanted to count the occurence for each distinct word. *All lines have the same contents so finally the result should be very small in bytes*. The code is as follows: val text = sc.textFile(inputDir) val tuples = text.flatMap(line = line.split(#)) .map((_, 1)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) = println(word + - + count)} I submitted the application to a Spark cluster of 5 nodes and run it in standalone mode. From the application UI http://imgbin.org/index.php?page=imageid=20976, we can see that the shuffle process for *collect* and *reduceByKey* occupied small bandwidth (766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle write). *However, the shuffle process occupied quite large bandwith when I added partitionBy like this:* val text = sc.textFile(inputDir) val tuples = text.flatMap(line = line.split(#)) .map((_, 1)) .partitionBy(new HashPartitioner(100)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) = println(word + - + count)} From the application UI http://imgbin.org/index.php?page=imageid=20977, we can see that the shuffle read for *collect* is 2.8GB and the shuffle write for *map* is 3.5GB. The *map* transformations are applied on 5 nodes of the cluster because the HDFS blocks are distributed among these 5 nodes. The *map* transformations are applied for each element in the rdd on different nodes and doesn't need shuffle the new rdd. *So my first question is : why did the map transformation occupy so large bandwidth(3.5GB) when I added partitionBy in the codes ?* When *collect* is applied, is needs to collect the results, namely (*word*, *totalCount*) tuples from 5 nodes to the driver. That process should occupy very small bandwidth because all lines have the same contents like AAA#BBB#CCC#DDD, which means the final results the *collect* retrieved should be very small in bytes(for example hundreds of KB). *So my second question is : Why did the collect action occupy so large bandwidth(2.8GB) when I added partitionByKey in the codes ?* *And the third question : When I added partitionBy for an rdd, it will return a new rdd. Does that mean the rdd will be immediately shuffled across nodes to meet the requirement specified by the supplied partitioner, or will the supplied partitioner merely be a sign indicating how to partition the rdd later. * Thanks.
Re: How to kill a Spark job running in cluster mode ?
Thanks for your replies. Actually we can kill a driver by the command bin/spark-class org.apache.spark.deploy.Client kill spark-master driver-id if you know the driver id. 2014-11-11 22:35 GMT+08:00 Ritesh Kumar Singh riteshoneinamill...@gmail.com : There is a property : spark.ui.killEnabled which needs to be set true for killing applications directly from the webUI. Check the link: Kill Enable spark job http://spark.apache.org/docs/latest/configuration.html#spark-ui Thanks On Tue, Nov 11, 2014 at 7:42 PM, Sonal Goyal sonalgoy...@gmail.com wrote: The web interface has a kill link. You can try using that. Best Regards, Sonal Founder, Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Tue, Nov 11, 2014 at 7:28 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I'm using Spark 1.0.0 and I'd like to kill a job running in cluster mode, which means the driver is not running on local node. So how can I kill such a job? Is there a command like hadoop job -kill job-id which kills a running MapReduce job ? Thanks
How to kill a Spark job running in cluster mode ?
I'm using Spark 1.0.0 and I'd like to kill a job running in cluster mode, which means the driver is not running on local node. So how can I kill such a job? Is there a command like hadoop job -kill job-id which kills a running MapReduce job ? Thanks
Re: All executors run on just a few nodes
Raymond, Thank you. But I read from other thread http://apache-spark-user-list.1001560.n3.nabble.com/When-does-Spark-switch-from-PROCESS-LOCAL-to-NODE-LOCAL-or-RACK-LOCAL-td7091.html that PROCESS_LOCAL means the data is in the same JVM as the code that is running. When data is in the same JVM with the code that is running, the data should be on the same node as JVM, i.e., the data can be said to be local. Also you said that the tasks will be assigned to available executors which satisfy the application's requirements. But what requirements must an executor satisfy so that task can be assigned to it? Do you mean resources (memory, CPU)? Finally, is there any way to guarantee that all executors for an application will be run on all Spark nodes when data to be processed is big enough (for example, HBase data resides on all RegionServers) ? 2014-10-20 11:35 GMT+08:00 raymond rgbbones.m...@gmail.com: My best guess is the speed that your executors got registered with driver differs between each run. when you run it for the first time, the executors is not fully registered when task set manager start to assign tasks, and thus the tasks was assigned to available executors which have already satisfy what you need ,say 86 with one batch. And the “Process_local” does not necessary means that the data is local, it could be that the executor is not available yet for the data source ( in your case, might though will be available later). If this is the case, you could just sleep a few seconds before run the job. or there are some patches related and providing other way to sync executors status before running applications, but I haven’t track the related status for a while. Raymond On 2014年10月20日, at 上午11:22, Tao Xiao xiaotao.cs@gmail.com wrote: Hi all, I have a Spark-0.9 cluster, which has 16 nodes. I wrote a Spark application to read data from an HBase table, which has 86 regions spreading over 20 RegionServers. I submitted the Spark app in Spark standalone mode and found that there were 86 executors running on just 3 nodes and it took about 30 minutes to read data from the table. In this case, I noticed from Spark master UI that Locality Level of all executors are PROCESS_LOCAL. Later I ran the same app again (without any code changed) and found that those 86 executors were running on 16 nodes, and this time it took just 4 minutes to read date from the same HBase table. In this case, I noticed that Locality Level of most executors are NODE_LOCAL. After testing multiple times, I found the two cases above occur randomly. So I have 2 questions: 1) Why would the two cases above occur randomly when I submitted the same application multiple times ? 2) Would the spread of executors influence locality level ? Thank you.
All executors run on just a few nodes
Hi all, I have a Spark-0.9 cluster, which has 16 nodes. I wrote a Spark application to read data from an HBase table, which has 86 regions spreading over 20 RegionServers. I submitted the Spark app in Spark standalone mode and found that there were 86 executors running on just 3 nodes and it took about 30 minutes to read data from the table. In this case, I noticed from Spark master UI that Locality Level of all executors are PROCESS_LOCAL. Later I ran the same app again (without any code changed) and found that those 86 executors were running on 16 nodes, and this time it took just 4 minutes to read date from the same HBase table. In this case, I noticed that Locality Level of most executors are NODE_LOCAL. After testing multiple times, I found the two cases above occur randomly. So I have 2 questions: 1) Why would the two cases above occur randomly when I submitted the same application multiple times ? 2) Would the spread of executors influence locality level ? Thank you.
Re: ClasssNotFoundExeception was thrown while trying to save rdd
Thanks Akhil. Both ways work for me, but I'd like to know why that exception was thrown. The class HBaseApp and related class were all contained in my application jar, why was *com.xt.scala.HBaseApp$$* *anonfun$testHBase$1* not found ? 2014-10-13 14:53 GMT+08:00 Akhil Das ak...@sigmoidanalytics.com: Adding your application jar to the sparkContext will resolve this issue. Eg: sparkContext.addJar(./target/scala-2.10/myTestApp_2.10-1.0.jar) Thanks Best Regards On Mon, Oct 13, 2014 at 8:42 AM, Tao Xiao xiaotao.cs@gmail.com wrote: In the beginning I tried to read HBase and found that exception was thrown, then I start to debug the app. I removed the codes reading HBase and tried to save an rdd containing a list and the exception was still thrown. So I'm sure that exception was not caused by reading HBase. While debugging I did not change the object name and file name. 2014-10-13 0:00 GMT+08:00 Ted Yu yuzhih...@gmail.com: Your app is named scala.HBaseApp Does it read / write to HBase ? Just curious. On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi all, I'm using CDH 5.0.1 (Spark 0.9) and submitting a job in Spark Standalone Cluster mode. The job is quite simple as follows: object HBaseApp { def main(args:Array[String]) { testHBase(student, /test/xt/saveRDD) } def testHBase(tableName: String, outFile:String) { val sparkConf = new SparkConf() .setAppName(-- Test HBase --) .set(spark.executor.memory, 2g) .set(spark.cores.max, 16) val sparkContext = new SparkContext(sparkConf) val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) val c = rdd.count // successful println(\n\n\n + c + \n\n\n) rdd.saveAsTextFile(outFile) // This line will throw java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 println(\n down \n) } } I submitted this job using the following script: #!/bin/bash HBASE_CLASSPATH=$(hbase classpath) APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar SPARK_MASTER=spark://b02.jsepc.com:7077 CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH export SPARK_CLASSPATH=/usr/lib/hbase/lib/* CONFIG_OPTS=-Dspark.master=$SPARK_MASTER java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@ After I submitted the job, the count of rdd could be computed successfully, but that rdd could not be saved into HDFS and the following exception was thrown: 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798
ClasssNotFoundExeception was thrown while trying to save rdd
Hi all, I'm using CDH 5.0.1 (Spark 0.9) and submitting a job in Spark Standalone Cluster mode. The job is quite simple as follows: object HBaseApp { def main(args:Array[String]) { testHBase(student, /test/xt/saveRDD) } def testHBase(tableName: String, outFile:String) { val sparkConf = new SparkConf() .setAppName(-- Test HBase --) .set(spark.executor.memory, 2g) .set(spark.cores.max, 16) val sparkContext = new SparkContext(sparkConf) val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) val c = rdd.count // successful println(\n\n\n + c + \n\n\n) rdd.saveAsTextFile(outFile) // This line will throw java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 println(\n down \n) } } I submitted this job using the following script: #!/bin/bash HBASE_CLASSPATH=$(hbase classpath) APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar SPARK_MASTER=spark://b02.jsepc.com:7077 CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH export SPARK_CLASSPATH=/usr/lib/hbase/lib/* CONFIG_OPTS=-Dspark.master=$SPARK_MASTER java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@ After I submitted the job, the count of rdd could be computed successfully, but that rdd could not be saved into HDFS and the following exception was thrown: 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at
Re: ClasssNotFoundExeception was thrown while trying to save rdd
In the beginning I tried to read HBase and found that exception was thrown, then I start to debug the app. I removed the codes reading HBase and tried to save an rdd containing a list and the exception was still thrown. So I'm sure that exception was not caused by reading HBase. While debugging I did not change the object name and file name. 2014-10-13 0:00 GMT+08:00 Ted Yu yuzhih...@gmail.com: Your app is named scala.HBaseApp Does it read / write to HBase ? Just curious. On Sun, Oct 12, 2014 at 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi all, I'm using CDH 5.0.1 (Spark 0.9) and submitting a job in Spark Standalone Cluster mode. The job is quite simple as follows: object HBaseApp { def main(args:Array[String]) { testHBase(student, /test/xt/saveRDD) } def testHBase(tableName: String, outFile:String) { val sparkConf = new SparkConf() .setAppName(-- Test HBase --) .set(spark.executor.memory, 2g) .set(spark.cores.max, 16) val sparkContext = new SparkContext(sparkConf) val rdd = sparkContext.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) val c = rdd.count // successful println(\n\n\n + c + \n\n\n) rdd.saveAsTextFile(outFile) // This line will throw java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 println(\n down \n) } } I submitted this job using the following script: #!/bin/bash HBASE_CLASSPATH=$(hbase classpath) APP_JAR=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar SPARK_ASSEMBLY_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar SPARK_MASTER=spark://b02.jsepc.com:7077 CLASSPATH=$CLASSPATH:$APP_JAR:$SPARK_ASSEMBLY_JAR:$HBASE_CLASSPATH export SPARK_CLASSPATH=/usr/lib/hbase/lib/* CONFIG_OPTS=-Dspark.master=$SPARK_MASTER java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.HBaseApp $@ After I submitted the job, the count of rdd could be computed successfully, but that rdd could not be saved into HDFS and the following exception was thrown: 14/10/11 16:09:33 WARN scheduler.TaskSetManager: Loss was due to java.lang.ClassNotFoundException java.lang.ClassNotFoundException: com.xt.scala.HBaseApp$$anonfun$testHBase$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40
Re: Reading from HBase is too slow
Hi Sean, Do I need to specify the number of executors when submitting the job? I suppose the number of executors will be determined by the number of regions of the table. Just like a MapReduce job, you needn't specify the number of map tasks when reading from a HBase table. The script to submit my job can be seen in my second post. Please refer to that. 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com: How did you run your program? I don't see from your earlier post that you ever asked for more executors. On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. So now the question is : Why are only 2 workers allocated?
Re: Reading from HBase is too slow
Sean, I did specify the number of cores to use as follows: ... ... val sparkConf = new SparkConf() .setAppName( Reading HBase ) .set(spark.cores.max, 32) val sc = new SparkContext(sparkConf) ... ... But that does not solve the problem --- only 2 workers are allocated. I'm using Spark 0.9 and submitting my job through Yarn client mode. Actually, setting *spark.cores.max* only applies when the job runs on a *standalone deploy cluster *or a *Mesos cluster in coarse-grained sharing mode*. Please refer to this link http://spark.apache.org/docs/0.9.1/configuration.html So how to specify the number of executors when submitting a Spark 0.9 job in Yarn Client mode? 2014-10-08 15:09 GMT+08:00 Sean Owen so...@cloudera.com: You do need to specify the number of executor cores to use. Executors are not like mappers. After all they may do much more in their lifetime than just read splits from HBase so would not make sense to determine it by something that the first line of the program does. On Oct 8, 2014 8:00 AM, Tao Xiao xiaotao.cs@gmail.com wrote: Hi Sean, Do I need to specify the number of executors when submitting the job? I suppose the number of executors will be determined by the number of regions of the table. Just like a MapReduce job, you needn't specify the number of map tasks when reading from a HBase table. The script to submit my job can be seen in my second post. Please refer to that. 2014-10-08 13:44 GMT+08:00 Sean Owen so...@cloudera.com: How did you run your program? I don't see from your earlier post that you ever asked for more executors. On Wed, Oct 8, 2014 at 4:29 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I found the reason why reading HBase is too slow. Although each regionserver serves multiple regions for the table I'm reading, the number of Spark workers allocated by Yarn is too low. Actually, I could see that the table has dozens of regions spread over about 20 regionservers, but only two Spark workers are allocated by Yarn. What is worse, the two workers run one after one. So, the Spark job lost parallelism. So now the question is : Why are only 2 workers allocated?
Re: Reading from HBase is too slow
. 2014-10-02 0:58 GMT+08:00 Vladimir Rodionov vrodio...@splicemachine.com: Yes, its in 0.98. CDH is free (w/o subscription) and sometimes its worth upgrading to the latest version (which is 0.98 based). -Vladimir Rodionov On Wed, Oct 1, 2014 at 9:52 AM, Ted Yu yuzhih...@gmail.com wrote: As far as I know, that feature is not in CDH 5.0.0 FYI On Wed, Oct 1, 2014 at 9:34 AM, Vladimir Rodionov vrodio...@splicemachine.com wrote: Using TableInputFormat is not the fastest way of reading data from HBase. Do not expect 100s of Mb per sec. You probably should take a look at M/R over HBase snapshots. https://issues.apache.org/jira/browse/HBASE-8369 -Vladimir Rodionov On Wed, Oct 1, 2014 at 8:17 AM, Tao Xiao xiaotao.cs@gmail.com wrote: I can submit a MapReduce job reading that table, although its processing rate is also a litter slower than I expected, but not that slow as Spark.
Re: Reading from HBase is too slow
I can submit a MapReduce job reading that table, although its processing rate is also a litter slower than I expected, but not that slow as Spark. 2014-10-01 12:04 GMT+08:00 Ted Yu yuzhih...@gmail.com: Can you launch a job which exercises TableInputFormat on the same table without using Spark ? This would show whether the slowdown is in HBase code or somewhere else. Cheers On Mon, Sep 29, 2014 at 11:40 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I checked HBase UI. Well, this table is not completely evenly spread across the nodes, but I think to some extent it can be seen as nearly evenly spread - at least there is not a single node which has too many regions. Here is a screenshot of HBase UI http://imgbin.org/index.php?page=imageid=19539. Besides, I checked the size of each region in bytes for this table in the HBase shell as follows: -bash-4.1$ hadoop dfs -du -h /hbase/data/default/C_CONS DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 288 /hbase/data/default/C_CONS/.tabledesc 0/hbase/data/default/C_CONS/.tmp 159.6 M /hbase/data/default/C_CONS/0008c2494a5399d68495d9c8ae147821 76.7 M /hbase/data/default/C_CONS/021d7d21d7faeb7b2a77835d6f86747e 81.3 M /hbase/data/default/C_CONS/02a39a316ac6d2bda89e72e74aa18a6e 155.3 M /hbase/data/default/C_CONS/02fe51bc077290febc85651d8ee31abc 173.4 M /hbase/data/default/C_CONS/045859bcc70e36eb4d33f8ca3b7d9633 82.6 M /hbase/data/default/C_CONS/05c868b6036cc4f1836f70be6215c851 74.1 M /hbase/data/default/C_CONS/0816378c837f1f3b84f4d4060d22beb3 84.7 M /hbase/data/default/C_CONS/083da8f5eb8a5b1cca76376449f357ca 346.6 M /hbase/data/default/C_CONS/0ac70fcb1baea0896ea069a6bcc30898 333.8 M /hbase/data/default/C_CONS/0b3be845bd4f5e958e8c9a18c8eaab21 72.7 M /hbase/data/default/C_CONS/12c13610c50dbc8ab27f20b0ebf2bfc4 76.1 M /hbase/data/default/C_CONS/1341966315d7e53be719d948d595bee0 72.4 M /hbase/data/default/C_CONS/1acdbc05c502b11da4852a1f21228f44 70.0 M /hbase/data/default/C_CONS/1b8f57d65f6c0e4de721e4c8f1944829 183.9 M /hbase/data/default/C_CONS/1f1ae7ca9f725fcf9639a4d52086fa50 65.5 M /hbase/data/default/C_CONS/20c10b96e2b9c40684aaeb6d0cfbf7c0 76.0 M /hbase/data/default/C_CONS/22515194fe09adcd4cbb2f5307303c73 78.4 M /hbase/data/default/C_CONS/236cd80393cb5b7c526bd2c45ce53a0a 150.0 M /hbase/data/default/C_CONS/23bd80852f47b97b4122709ec844d4ed 81.6 M /hbase/data/default/C_CONS/241b8bc415029dedf94c4a84e6c4ad3b 77.9 M /hbase/data/default/C_CONS/27f1e59bde75ef3096a5bdd3eb402cd7 160.8 M /hbase/data/default/C_CONS/30c2ae3be38b8cdf3b337054a7d61478 372.2 M /hbase/data/default/C_CONS/31d606da71b35844d0cdc8a195c97d2e 182.6 M /hbase/data/default/C_CONS/3274a022bc7419d426cf63caa1cc88e1 92.1 M /hbase/data/default/C_CONS/344faae7971d87b51edf23f75a7c3746 154.7 M /hbase/data/default/C_CONS/3b3f0c839bdb32ed2104f67c8a02da41 77.4 M /hbase/data/default/C_CONS/3cf6b2bd0cfe85f3111d0ba1b84a60b4 71.5 M /hbase/data/default/C_CONS/3f466db078d07e2bfb11c681e0e3 77.8 M /hbase/data/default/C_CONS/3f8c1b7dec05118eb9894bb591e32b2f 83.6 M /hbase/data/default/C_CONS/45e105856fcb54748c48bd45e973a3b9 185.2 M /hbase/data/default/C_CONS/4becd90d46a2d4a6bd8ecbe02b60892c 165.6 M /hbase/data/default/C_CONS/4dcebd58c7013062c4a8583012a11b5a 67.3 M /hbase/data/default/C_CONS/51f845d842605dda66b1ae01ad8a17e8 148.2 M /hbase/data/default/C_CONS/532189155ab78dbd1e36aac3ab4878a8 172.6 M /hbase/data/default/C_CONS/5401d9cb19adb9bd78718ea047e6d9d7 139.4 M /hbase/data/default/C_CONS/547d2a8c54aae73e8f12b4570efd984c 89.5 M /hbase/data/default/C_CONS/54cbac1f71c7781697052bb2aa1c5a18 101.3 M /hbase/data/default/C_CONS/55263ce293327683b9c6e6098ec3e89a 85.2 M /hbase/data/default/C_CONS/55f8c278e35de6bca5083c7a66e355fb 85.8 M /hbase/data/default/C_CONS/57112558912e1de016327e115bc84f11 171.8 M /hbase/data/default/C_CONS/572b886cbfe92ddcb97502f041953fb8 51 /hbase/data/default/C_CONS/6bd64d8cf6b38806731f7693bdd673c9 86.6 M /hbase/data/default/C_CONS/7695703b7b527afc5f3524eee9b5d806 74.8 M /hbase/data/default/C_CONS/7bb7567685f5e16a4379d7cf79de2ecc 120.1 M /hbase/data/default/C_CONS/7c144bef991bb3c959d7ef6e2fa5036a 166.0 M /hbase/data/default/C_CONS/7c7817eb3e531d5bda88b5f0de6a20de 173.5 M /hbase/data/default/C_CONS/7d07c139575d007ecbb23fa946e39130 139.2 M /hbase/data/default/C_CONS/8295aa701110ddf4055e8c3ca5bd9cad 91.7 M /hbase/data/default/C_CONS/84b340d22471580ed8100d6614668eb1 81.2 M /hbase/data/default/C_CONS/8605f4470498a01a5ec4c88e7ea8a458 78.3 M /hbase/data/default/C_CONS/897da8e33275b80926ef38200132f819 234.4 M /hbase/data/default/C_CONS/93f5ce30ed8e54cc282cb5b88fa28d76 126.3 M /hbase/data/default/C_CONS/96dd1decd62e35c394bb8e7f6095f054 80.9 M /hbase/data/default/C_CONS/998364405e57a7eedae094bca76a419e 184.8 M /hbase/data/default/C_CONS/9df3b62b1bff59b67b75ad86d694b8c8 126.6 M /hbase
Re: Reading from HBase is too slow
/b1ae0451f592b28eed8a58908f91293a 91.5 M /hbase/data/default/C_CONS/b8396049e2b742108add1485c0eb4aeb 81.2 M /hbase/data/default/C_CONS/b8d25b3e536b4fea5ee4ee2b21885c76 87.8 M /hbase/data/default/C_CONS/bbfbe319705df23a23a89b40e52d89a8 81.3 M /hbase/data/default/C_CONS/bccaeedc65d9295289f78aaec588cc3d 95.8 M /hbase/data/default/C_CONS/c229d583958802571dfaa9a39453df0d 88.5 M /hbase/data/default/C_CONS/c9d7a038243d1b3e2448a48007f1f9e0 158.8 M /hbase/data/default/C_CONS/cca1bf1f013724af25d71ad4310e5d4a 212.8 M /hbase/data/default/C_CONS/ccabf798734aa8e05798c43c132ad565 85.1 M /hbase/data/default/C_CONS/d1cb54346e109b1ba76fd95aa4540161 84.4 M /hbase/data/default/C_CONS/d4dd8c3fa81b751892689cc92a96aa99 139.5 M /hbase/data/default/C_CONS/dc15ceeed21474b51086f3103cbd0074 97.7 M /hbase/data/default/C_CONS/df20e2077f22e83ecd8e0d52dea1 221.0 M /hbase/data/default/C_CONS/e30d0d55e0887a676c8b79e03771ad23 75.7 M /hbase/data/default/C_CONS/e6ed24ce0b3e1e903bd9757d28380f3a 74.9 M /hbase/data/default/C_CONS/e9732d9905f5373fb0fd7a1ce033e17b 101.2 M /hbase/data/default/C_CONS/f2a49dbaf018f0e45bbd7a758f123418 172.6 M /hbase/data/default/C_CONS/f34645de36d3c1413ce83177e2118947 89.2 M /hbase/data/default/C_CONS/f3db2bf3b7ffb7b4c0029eac5d631bdb 81.6 M /hbase/data/default/C_CONS/f43b49c4f384853266e9ee45a98104a6 68.9 M /hbase/data/default/C_CONS/fa4fb0047ec98fb10bf84fd72937f415 86.7 M /hbase/data/default/C_CONS/fc69f349655676e046c9110550825f5a 155.0 M /hbase/data/default/C_CONS/feb0835bdf73c257de11c65f18b1330d 75.2 M /hbase/data/default/C_CONS/fff9fbe56af8b9e0e00826f8936e7a56 From the result above we can see that the biggest region's size is 346.6 M, while most other regions' size are near each other. So what may be the real reason ? 2014-09-30 12:17 GMT+08:00 Vladimir Rodionov vrodio...@splicemachine.com: HBase TableInputFormat creates input splits one per each region. You can not achieve high level of parallelism unless you have 5-10 regions per RS at least. What does it mean? You probably have too few regions. You can verify that in HBase Web UI. -Vladimir Rodionov On Mon, Sep 29, 2014 at 7:21 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I submitted a job in Yarn-Client mode, which simply reads from a HBase table containing tens of millions of records and then does a *count *action. The job runs for a much longer time than I expected, so I wonder whether it was because the data to read was too much. Actually, there are 20 nodes in my Hadoop cluster so the HBase table seems not so big (tens of millopns of records). : I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96). BTW, when the job was running, I can see logs on the console, and specifically I'd like to know what the following log means: 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL) 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as 13454 bytes in 0 ms 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426 ms on b04.jsepc.com (progress: 18/86) 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19) Thanks
Reading from HBase is too slow
I submitted a job in Yarn-Client mode, which simply reads from a HBase table containing tens of millions of records and then does a *count *action. The job runs for a much longer time than I expected, so I wonder whether it was because the data to read was too much. Actually, there are 20 nodes in my Hadoop cluster so the HBase table seems not so big (tens of millopns of records). : I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96). BTW, when the job was running, I can see logs on the console, and specifically I'd like to know what the following log means: 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL) 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as 13454 bytes in 0 ms 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426 ms on b04.jsepc.com (progress: 18/86) 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19) Thanks
Re: Reading from HBase is too slow
I submitted the job in Yarn-Client mode using the following script: export SPARK_JAR=/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar export HADOOP_CLASSPATH=$(hbase classpath) export CLASSPATH=$CLASSPATH:/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar:/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hadoop-common-2.3.0-cdh5.0.1.jar:/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar:/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar:/usr/games/spark/xt/htrace-core-2.01.jar:$HADOOP_CLASSPATH CONFIG_OPTS=-Dspark.master=yarn-client -Dspark.jars=/usr/games/spark/xt/SparkDemo-0.0.1-SNAPSHOT.jar,/usr/games/spark/xt/spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar,/usr/games/spark/xt/hbase-client-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-common-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-server-0.96.1.1-cdh5.0.1.jar,/usr/games/spark/xt/hbase-protocol-0.96.0-hadoop2.jar,/usr/games/spark/xt/htrace-core-2.01.jar java -cp $CLASSPATH $CONFIG_OPTS com.xt.scala.TestSpark My job's code is as follows: object TestSpark { def main(args: Array[String]) { readHBase(C_CONS) } def readHBase(tableName: String) { val hbaseConf = HBaseConfiguration.create() hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) val sparkConf = new SparkConf() .setAppName( Reading HBase ) val sc = new SparkContext(sparkConf) val rdd = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println(rdd.count) } } 2014-09-30 10:21 GMT+08:00 Tao Xiao xiaotao.cs@gmail.com: I submitted a job in Yarn-Client mode, which simply reads from a HBase table containing tens of millions of records and then does a *count *action. The job runs for a much longer time than I expected, so I wonder whether it was because the data to read was too much. Actually, there are 20 nodes in my Hadoop cluster so the HBase table seems not so big (tens of millopns of records). : I'm using CDH 5.0.0 (Spark 0.9 and HBase 0.96). BTW, when the job was running, I can see logs on the console, and specifically I'd like to know what the following log means: 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Starting task 0.0:20 as TID 20 on executor 2: b04.jsepc.com (PROCESS_LOCAL) 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Serialized task 0.0:20 as 13454 bytes in 0 ms 14/09/30 09:45:20 INFO scheduler.TaskSetManager: Finished TID 19 in 16426 ms on b04.jsepc.com (progress: 18/86) 14/09/30 09:45:20 INFO scheduler.DAGScheduler: Completed ResultTask(0, 19) Thanks
How to sort rdd filled with existing data structures?
Hi , I have the following rdd : val conf = new SparkConf() .setAppName( Testing Sorting ) val sc = new SparkContext(conf) val L = List( (new Student(XiaoTao, 80, 29), I'm Xiaotao), (new Student(CCC, 100, 24), I'm CCC), (new Student(Jack, 90, 25), I'm Jack), (new Student(Tom, 60, 35), I'm Tom), (new Student(Lucy, 78, 22), I'm Lucy)) val rdd = sc.parallelize(L, 3) where Student is a class defined as follows: class Student(val name:String, val score:Int, val age:Int) { override def toString = name: + name + , score: + score + , age: + age } I want to sort the *rdd *by key, but when I wrote rdd.sortByKey it complained that No implicit Ordering defined, which means I must extend the class with *Ordered *and provide a method named *compare*. The problem is that the class Student is from a third-party library so I cannot change its definition. I'd like to know if there is a sorting method that I can provide it a customized compare function so that it can sort the rdd according to the sorting function I provide. One more question, if I want to sort RDD[(k, v)] by value , do I have to map that rdd so that its key and value exchange their positions in the tuple? Are there any functions that allow us to sort rdd by things other than key ? Thanks
Re: combineByKey throws ClassCastException
This problem was caused by the fact that I used a package jar with a Spark version (0.9.1) different from that of the cluster (0.9.0). When I used the correct package jar (spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the application can run as expected. 2014-09-15 14:57 GMT+08:00 x wasedax...@gmail.com: How about this. scala val rdd2 = rdd.combineByKey( | (v: Int) = v.toLong, | (c: Long, v: Int) = c + v, | (c1: Long, c2: Long) = c1 + c2) rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at combineB yKey at console:14 xj @ Tokyo On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao xiaotao.cs@gmail.com wrote: I followd an example presented in the tutorial Learning Spark http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html to compute the per-key average as follows: val Array(appName) = args val sparkConf = new SparkConf() .setAppName(appName) val sc = new SparkContext(sparkConf) /* * compute the per-key average of values * results should be: *A : 5.8 *B : 14 *C : 60.6 */ val rdd = sc.parallelize(List( (A, 3), (A, 9), (A, 12), (A, 0), (A, 5), (B, 4), (B, 10), (B, 11), (B, 20), (B, 25), (C, 32), (C, 91), (C, 122), (C, 3), (C, 55)), 2) val avg = rdd.combineByKey( (x:Int) = (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer (acc:(Int, Int), x) = (acc._1 + x, acc._2 + 1), (acc1:(Int, Int), acc2:(Int, Int)) = (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{case (s, t) = (s, t._1/t._2.toFloat)} avg.collect.foreach(t = println(t._1 + - + t._2)) When I submitted the application, an exception of *java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer* was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) = (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which meets the requirements quite well. But why would such an exception be thrown? I'm using CDH 5.0 and Spark 0.9 Thanks.
Re: What is the appropriate privileges needed for writting files into checkpoint directory?
I found the answer. Here the file system of the checkpoint should be a fault-tolerant file system like HDFS, so we should set it to a HDFS path. It is not a local file system path. 2014-09-03 10:28 GMT+08:00 Tao Xiao xiaotao.cs@gmail.com: I tried to run KafkaWordCount in a Spark standalone cluster. In this application, the checkpoint directory was set as follows : val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpoint) After submitting my application into the cluster, I could see the correct counting results on the console, but the running application kept complaining the following: 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171 (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) at org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) On the node where I submitted the applicaition, the checkpoint directory( /usr/games/SparkStreaming/checkpoint) was created and some files was created there, but there existed no such directory on other nodes of the Spark cluster. I guess that was because processes on other nodes of the cluster didn't have appropriate privileges to create the checkpoint directory. So I created that directory on each node manually and changed its mode to 777, which means any user can write to that directory. But the SparkStreaming application still kept throwing that exception. So what is the real reason? Thanks.
What is the appropriate privileges needed for writting files into checkpoint directory?
I tried to run KafkaWordCount in a Spark standalone cluster. In this application, the checkpoint directory was set as follows : val sparkConf = new SparkConf().setAppName(KafkaWordCount) val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint(checkpoint) After submitting my application into the cluster, I could see the correct counting results on the console, but the running application kept complaining the following: 14/09/03 10:01:22 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException java.io.FileNotFoundException: /usr/games/SparkStreaming/checkpoint/a03505c8-0183-4bc0-b674-bf0e16767564/rdd-96/.part-0-attempt-171 (Permission denied) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.init(FileOutputStream.java:194) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:206) at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.init(RawLocalFileSystem.java:202) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:265) at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.init(ChecksumFileSystem.java:384) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:443) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:849) at org.apache.spark.rdd.CheckpointRDD$.writeToFile(CheckpointRDD.scala:103) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.rdd.RDDCheckpointData$$anonfun$doCheckpoint$1.apply(RDDCheckpointData.scala:96) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) at java.lang.Thread.run(Thread.java:662) On the node where I submitted the applicaition, the checkpoint directory( /usr/games/SparkStreaming/checkpoint) was created and some files was created there, but there existed no such directory on other nodes of the Spark cluster. I guess that was because processes on other nodes of the cluster didn't have appropriate privileges to create the checkpoint directory. So I created that directory on each node manually and changed its mode to 777, which means any user can write to that directory. But the SparkStreaming application still kept throwing that exception. So what is the real reason? Thanks.
What does appMasterRpcPort: -1 indicate ?
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it. Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to submit my job in local mode, Spark Standalone mode and YARN mode. I successfully submitted my job in local mode and Standalone mode, however, I noticed the following messages printed on console when I submitted my job in YARN mode: 14/08/29 22:27:29 INFO Client: Submitting application to ASM 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application application_1406949333981_0015 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: 0 appStartTime: 1409365649836 yarnAppState: RUNNING The job finished successfully and produced correct results. But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate an error or exception ?
Re: What does appMasterRpcPort: -1 indicate ?
Thanks Yi, I think your answers make sense. We can see a series of messages with appMasterRpcPort: -1 followed by a message with appMasterRpcPort: 0, perhaps that means we were waiting for the application master to be started (appMasterRpcPort: -1), and later the application master got started (appMasterRpcPort: 0). 2014-08-31 23:10 GMT+08:00 Yi Tian tianyi.asiai...@gmail.com: I think -1 means your application master has not been started yet. 在 2014年8月31日,23:02,Tao Xiao xiaotao.cs@gmail.com 写道: I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it. Following How-to: Run a Simple Apache Spark App in CDH 5 , I tried to submit my job in local mode, Spark Standalone mode and YARN mode. I successfully submitted my job in local mode and Standalone mode, however, I noticed the following messages printed on console when I submitted my job in YARN mode: 14/08/29 22:27:29 INFO Client: Submitting application to ASM 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application application_1406949333981_0015 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: 0 appStartTime: 1409365649836 yarnAppState: RUNNING The job finished successfully and produced correct results. But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate an error or exception ?
Fwd: What does appMasterRpcPort: -1 indicate ?
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it. Following How-to: Run a Simple Apache Spark App in CDH 5 http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/ , I tried to submit my job in local mode, Spark Standalone mode and YARN mode. I successfully submitted my job in local mode and Standalone mode, however, I noticed the following messages printed on console when I submitted my job in YARN mode: 14/08/29 22:27:29 INFO Client: Submitting application to ASM 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application application_1406949333981_0015 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: 0 appStartTime: 1409365649836 yarnAppState: RUNNING The job finished successfully and produced correct results. But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate an error or exception ? Thanks
How can a deserialized Java object be stored on disk?
Reading about RDD Persistency https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence, I learned that the storage level MEMORY_AND_DISK means that Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. But how can a deserialized Java object be stored on disk? As far as I know, a Java object should be stored as an array of bytes on disk, which means that Java object should be firtly converted into an array of bytes (a serialized object). Thanks .
How to provide a custom Comparator to sortByKey?
I am using Spark 0.9 I have an array of tuples, and I want to sort these tuples using the *sortByKey *API as follows in Spark shell: val A:Array[(String, String)] = Array((1, One), (9, Nine), (3, three), (5, five), (4, four)) val P = sc.parallelize(A) // MyComparator is an example, maybe I have more complex implementation class MyComparator extends java.util.Comparator[String] { def compare(s1:String, s2:String):Int = { s1.compareTo(s2) } } val comp = new MyComparator() P.sortByKey(comp, true) When I invoked P.sortByKey(comp, true), spark shell complained that there was a type mismatch, to be specific, *sortByKey *requires *Boolean *but I provided a *Comparator*. How should I provide my custom comparator to *sortByKey *?
Re: Need some tutorials and examples about customized partitioner
Thank you Mayur, I think that will help me a lot Best, Tao 2014-02-26 8:56 GMT+08:00 Mayur Rustagi mayur.rust...@gmail.com: Type of Shuffling is best explained by Matei in Spark Internals . http://www.youtube.com/watch?v=49Hr5xZyTEA#t=2203 Why dont you look at that then if you have follow up questions ask here, also would be good to watch this whole talk as it talks about Spark job flows in a lot more detail. SCALA import org.apache.spark.RangePartitioner; var file=sc.textFile(my local path) var partitionedFile=file.map(x=(x,1)) var data= partitionedFile.partitionBy(new RangePartitioner(3, partitionedFile)) data.glom().collect()(0).length data.glom().collect()(1).length data.glom().collect()(2).length This will sample the RDD partitionedFile then try to partition partitionedFile in almost equal sizes. Do not do collect if your data size is huge as this may OOM the driver, write it to disk in that case. Scala Mayur Rustagi Ph: +919632149971 h https://twitter.com/mayur_rustagittp://www.sigmoidanalytics.com https://twitter.com/mayur_rustagi On Tue, Feb 25, 2014 at 1:19 AM, Tao Xiao xiaotao.cs@gmail.comwrote: I am a newbie to Spark and I need to know how RDD partitioning can be controlled in the process of shuffling. I have googled for examples but haven't found much concrete examples, in contrast with the fact that there are many good tutorials about Hadoop's shuffling and partitioner. Can anybody show me good tutorials explaining the process of shuffling in Spark, as well as examples of how to use a customized partitioner.? Best, Tao