Complex transformation on a dataframe column
Hi, I have searched around but could not find a satisfying answer to this question: what is the best way to do a complex transformation on a dataframe column? For example, I have a dataframe with the following schema and a function that has pretty complex logic to format addresses. I would like to use the function to format each address and store the output as an additional column in the dataframe. What is the best way to do it? Use Dataframe.map? Define a UDF? Some code example would be appreciated. Input dataframe: root |-- ID: string (nullable = true) |-- Name: string (nullable = true) |-- PhoneNumber: string (nullable = true) |-- Address: string (nullable = true) Output dataframe: root |-- ID: string (nullable = true) |-- Name: string (nullable = true) |-- PhoneNumber: string (nullable = true) |-- Address: string (nullable = true) |-- FormattedAddress: string (nullable = true) The function for format addresses: def formatAddress(address: String): String Best regards, Hao Wang - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to convert dataframe to a nested StructType schema
Thanks, Terry. This is exactly what I need :) Hao On Tue, Sep 15, 2015 at 8:47 PM, Terry Hole <hujie.ea...@gmail.com> wrote: > Hao, > > For spark 1.4.1, you can try this: > val rowrdd = df.rdd.map(r => Row(Row(r(3)), Row(r(0), r(1), r(2 > val newDF = sqlContext.createDataFrame(rowrdd, yourNewSchema) > > Thanks! > > - Terry > > On Wed, Sep 16, 2015 at 2:10 AM, Hao Wang <billhao.l...@gmail.com> wrote: > >> Hi, >> >> I created a dataframe with 4 string columns (city, state, country, >> zipcode). >> I then applied the following nested schema to it by creating a custom >> StructType. When I run df.take(5), it gives the exception below as >> expected. >> The question is how I can convert the Rows in the dataframe to conform to >> this nested schema? Thanks! >> >> root >> |-- ZipCode: struct (nullable = true) >> ||-- zip: string (nullable = true) >> |-- Address: struct (nullable = true) >> ||-- city: string (nullable = true) >> ||-- state: string (nullable = true) >> ||-- country: string (nullable = true) >> >> [info] org.apache.spark.SparkException: Job aborted due to stage >> failure: >> Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in >> stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class >> java.lang.String) >> [info] at >> >> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178) >> [info] at >> >> org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) >> [info] at >> >> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) >> [info] at >> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) >> [info] at >> org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
How to convert dataframe to a nested StructType schema
Hi, I created a dataframe with 4 string columns (city, state, country, zipcode). I then applied the following nested schema to it by creating a custom StructType. When I run df.take(5), it gives the exception below as expected. The question is how I can convert the Rows in the dataframe to conform to this nested schema? Thanks! root |-- ZipCode: struct (nullable = true) ||-- zip: string (nullable = true) |-- Address: struct (nullable = true) ||-- city: string (nullable = true) ||-- state: string (nullable = true) ||-- country: string (nullable = true) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class java.lang.String) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) [info] at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) [info] at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24703.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to convert dataframe to a nested StructType schema
Hi, I created a dataframe with 4 string columns (city, state, country, zipcode). I then applied the following nested schema to it by creating a custom StructType. When I run df.take(5), it gives the exception below as expected. The question is how I can convert the Rows in the dataframe to conform to this nested schema? Thanks! root |-- ZipCode: struct (nullable = true) ||-- zip: string (nullable = true) |-- Address: struct (nullable = true) ||-- city: string (nullable = true) ||-- state: string (nullable = true) ||-- country: string (nullable = true) [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): scala.MatchError: 95123 (of class java.lang.String) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:178) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$.convertRowWithConverters(CatalystTypeConverters.scala:348) [info] at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$4.apply(CatalystTypeConverters.scala:180) [info] at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) [info] at org.apache.spark.sql.SQLContext$$anonfun$9.apply(SQLContext.scala:488) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-convert-dataframe-to-a-nested-StructType-schema-tp24694.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to split log data into different files according to severity
Thanks for the link. I’m still running 1.3.1 but will give it a try :) Hao On Jun 13, 2015, at 9:38 AM, Will Briggs wrbri...@gmail.com wrote: Check out this recent post by Cheng Liam regarding dynamic partitioning in Spark 1.4: https://www.mail-archive.com/user@spark.apache.org/msg30204.html https://www.mail-archive.com/user@spark.apache.org/msg30204.html On June 13, 2015, at 5:41 AM, Hao Wang bill...@gmail.com wrote: Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
Re: How to split log data into different files according to severity
I am currently using filter inside a loop of all severity levels to do this, which I think is pretty inefficient. It has to read the entire data set once for each severity. I wonder if there is a more efficient way that takes just one pass of the data? Thanks. Best, Hao Wang On Jun 13, 2015, at 3:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you looking for something like filter? See a similar example here https://spark.apache.org/examples.html https://spark.apache.org/examples.html Thanks Best Regards On Sat, Jun 13, 2015 at 3:11 PM, Hao Wang bill...@gmail.com mailto:bill...@gmail.com wrote: Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
How to split log data into different files according to severity
Hi, I have a bunch of large log files on Hadoop. Each line contains a log and its severity. Is there a way that I can use Spark to split the entire data set into different files on Hadoop according the severity field? Thanks. Below is an example of the input and output. Input: [ERROR] log1 [INFO] log2 [ERROR] log3 [INFO] log4 Output: error_file [ERROR] log1 [ERROR] log3 info_file [INFO] log2 [INFO] log4 Best, Hao Wang
Re: Kyro deserialisation error
Hi, all Yes, it's a name of Wikipedia article. I am running WikipediaPageRank example of Spark Bagels. I am wondering whether there is any relation to buffer size of Kyro. The page rank can be successfully finished, sometimes not because this kind of Kyro exception happens too many times, which beats the maxTaskFailures. I find this *Kyro exception: unable to find class *in my successful case, too. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jul 17, 2014 at 4:44 PM, Sean Owen so...@cloudera.com wrote: Not sure if this helps, but it does seem to be part of a name in a Wikipedia article, and Wikipedia is the data set. So something is reading this class name from the data. http://en.wikipedia.org/wiki/Carl_Fridtjof_Rode On Thu, Jul 17, 2014 at 9:40 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Seems like there is some sort of stream corruption, causing Kryo read to read a weird class name from the stream (the name arl Fridtjof Rode in the exception cannot be a class!). Not sure how to debug this. @Patrick: Any idea?
Re: Kyro deserialisation error
Thanks for your reply. The SparkContext is configured as below: sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.kryo.registrator, classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.executor.memory, 60g) sparkConf.set(spark.cores.max, 48) sparkConf.set(spark.kryoserializer.buffer.mb, 24) val sc = new SparkContext(sparkConf) sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar) And I use spark-submit to run the application: ./bin/spark-submit --master spark://sing12:7077 --total-executor-cores 40 --executor-memory 40g --class org.apache.spark.examples.bagel.WikipediaPageRank ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar hdfs://192.168.1.12:9000/freebase-26G 1 200 True Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using classes from external libraries that have not been added to the sparkContext, using sparkcontext.addJar()? TD On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote: I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: J Serialization trace: id (org.apache.spark.storage.GetBlock) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59
Re: Kyro deserialisation error
I am not sure. Not every task will fail at this Kyro exception. In most time, the cluster could successfully finish the WikipediaPageRank. How could I debug this exception? Thanks Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jul 17, 2014 at 2:58 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Is the class that is not found in the wikipediapagerank jar? TD On Wed, Jul 16, 2014 at 12:32 AM, Hao Wang wh.s...@gmail.com wrote: Thanks for your reply. The SparkContext is configured as below: sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer) sparkConf.set(spark.kryo.registrator, classOf[PRKryoRegistrator].getName) val inputFile = args(0) val threshold = args(1).toDouble val numPartitions = args(2).toInt val usePartitioner = args(3).toBoolean sparkConf.setAppName(WikipediaPageRank) sparkConf.set(spark.executor.memory, 60g) sparkConf.set(spark.cores.max, 48) sparkConf.set(spark.kryoserializer.buffer.mb, 24) val sc = new SparkContext(sparkConf) sc.addJar(~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar) And I use spark-submit to run the application: ./bin/spark-submit --master spark://sing12:7077 --total-executor-cores 40 --executor-memory 40g --class org.apache.spark.examples.bagel.WikipediaPageRank ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar hdfs://192.168.1.12:9000/freebase-26G 1 200 True Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Wed, Jul 16, 2014 at 1:41 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using classes from external libraries that have not been added to the sparkContext, using sparkcontext.addJar()? TD On Tue, Jul 15, 2014 at 8:36 PM, Hao Wang wh.s...@gmail.com wrote: I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due
Re: Kyro deserialisation error
I am running the WikipediaPageRank in Spark example and share the same problem with you: 4/07/16 11:31:06 DEBUG DAGScheduler: submitStage(Stage 6) 14/07/16 11:31:06 ERROR TaskSetManager: Task 6.0:450 failed 4 times; aborting job 14/07/16 11:31:06 INFO DAGScheduler: Failed to run foreach at Bagel.scala:251 Exception in thread main 14/07/16 11:31:06 INFO TaskSchedulerImpl: Cancelling stage 6 org.apache.spark.SparkException: Job aborted due to stage failure: Task 6.0:450 failed 4 times, most recent failure: Exception failure in TID 1330 on host sing11: com.esotericsoftware.kryo.KryoException: Unable to find class: arl Fridtjof Rode com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721) com.twitter.chill.TraversableSerializer.read(Traversable.scala:44) com.twitter.chill.TraversableSerializer.read(Traversable.scala:21) com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:115) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) Anyone cloud help? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Tue, Jun 3, 2014 at 8:02 PM, Denes te...@outlook.com wrote: I tried to use Kryo as a serialiser isn spark streaming, did everything according to the guide posted on the spark website, i.e. added the following lines: conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer); conf.set(spark.kryo.registrator, MyKryoRegistrator); I also added the necessary classes to the MyKryoRegistrator. However I get the following strange error, can someone help me out where to look for a solution? 14/06/03 09:00:49 ERROR scheduler.JobScheduler: Error running job streaming job 140177880 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Exception while deserializing and fetching task: com.esotericsoftware.kryo.KryoException: Unable to find class: J Serialization trace: id (org.apache.spark.storage.GetBlock) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kyro-deserialisation-error-tp6798.html Sent from the Apache Spark User List
Re: long GC pause during file.cache()
Hi, Wei You may try to set JVM opts in *spark-env.sh* as follow to prevent or mitigate GC pause: export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m There are more options you could add, please just Google :) Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, Jun 15, 2014 at 10:24 AM, Wei Tan w...@us.ibm.com wrote: Hi, I have a single node (192G RAM) stand-alone spark, with memory configuration like this in spark-env.sh SPARK_WORKER_MEMORY=180g SPARK_MEM=180g In spark-shell I have a program like this: val file = sc.textFile(/localpath) //file size is 40G file.cache() val output = file.map(line = extract something from line) output.saveAsTextFile (...) When I run this program again and again, or keep trying file.unpersist() -- file.cache() -- output.saveAsTextFile(), the run time varies a lot, from 1 min to 3 min to 50+ min. Whenever the run-time is more than 1 min, from the stage monitoring GUI I observe big GC pause (some can be 10+ min). Of course when run-time is normal, say ~1 min, no significant GC is observed. The behavior seems somewhat random. Is there any JVM tuning I should do to prevent this long GC pause from happening? I used java-1.6.0-openjdk.x86_64, and my spark-shell process is something like this: root 10994 1.7 0.6 196378000 1361496 pts/51 Sl+ 22:06 0:12 /usr/lib/jvm/java-1.6.0-openjdk.x86_64/bin/java -cp ::/home/wtan/scala/spark-1.0.0-bin-hadoop1/conf:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-core-3.2.2.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-rdbms-3.2.1.jar:/home/wtan/scala/spark-1.0.0-bin-hadoop1/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms180g -Xmx180g org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center *http://researcher.ibm.com/person/us-wtan* http://researcher.ibm.com/person/us-wtan
Akka listens to hostname while user may spark-submit with master in IP url
Hi, All In Spark the spark.driver.host is driver hostname in default, thus, akka actor system will listen to a URL like akka.tcp://hostname:port. However, when a user tries to use spark-submit to run application, the user may set --master spark://192.168.1.12:7077. Then, the *AppClient* in *SparkDeploySchedulerBackend* cannot successfully register to the Master, and the console prints: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory I think we need to improve this by making akka recognises both hostname and the corresponding IP. Or at least add lines in Spark document to limit user from using IP. Any comments? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
BUG? Why does MASTER have to be set to spark://hostname:port?
Hi, all When I try to run Spark PageRank using: ./bin/spark-submit \ --master spark://192.168.1.12:7077 \ --class org.apache.spark.examples.bagel.WikipediaPageRank \ ~/Documents/Scala/WikiPageRank/target/scala-2.10/wikipagerank_2.10-1.0.jar \ hdfs://192.168.1.12:9000/freebase-13G 0.05 100 True *I don't specify the Spark Master by SparkContext.setMaster() in PageRank code.* Unfortunately, it hanged on here: 14/06/13 22:09:43 INFO DAGScheduler: Submitting 104 missing tasks from Stage 0 (MappedRDD[1] at textFile at WikipediaPageRank.scala:59) 14/06/13 22:09:43 INFO TaskSchedulerImpl: Adding task set 0.0 with 104 tasks 14/06/13 22:09:58 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory But after I change --master to hostname:7077, it works normally. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: how to set spark.executor.memory and heap size
Hi, Laurent You could set Spark.executor.memory and heap size by following methods: 1. in you conf/spark-env.sh: *export SPARK_WORKER_MEMORY=38g* *export SPARK_JAVA_OPTS=-XX:-UseGCOverheadLimit -XX:+UseConcMarkSweepGC -Xmx2g -XX:MaxPermSize=256m* 2. you could also add modification for executor memory and java opts in *spark-submit *parameters. Check the Spark *configure *and *tuning *docs, you could find full answers there. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Thu, Jun 12, 2014 at 6:29 PM, Laurent T laurent.thou...@ldmobile.net wrote: Hi, Can you give us a little more insight on how you used that file to solve your problem ? We're having the same OOM as you were and haven't been able to solve it yet. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p7469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark 1.0.0 Standalone AppClient cannot connect Master
Hi, all Why does the Spark 1.0.0 official doc remove how to build Spark with corresponding Hadoop version? It means that if I don't need to specify the Hadoop version with I build my Spark 1.0.0 with `sbt/sbt assembly`? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: Spark 1.0.0 Standalone AppClient cannot connect Master
Hi, Andrew Got it, Thanks! Hao Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Fri, Jun 13, 2014 at 12:42 AM, Andrew Or and...@databricks.com wrote: Hi Wang Hao, This is not removed. We moved it here: http://spark.apache.org/docs/latest/hadoop-third-party-distributions.html If you're building with SBT, and you don't specify the SPARK_HADOOP_VERSION, then it defaults to 1.0.4. Andrew 2014-06-12 6:24 GMT-07:00 Hao Wang wh.s...@gmail.com: Hi, all Why does the Spark 1.0.0 official doc remove how to build Spark with corresponding Hadoop version? It means that if I don't need to specify the Hadoop version with I build my Spark 1.0.0 with `sbt/sbt assembly`? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
Re: com.google.protobuf out of memory
Hi, Zuhair According to my experience, you could try following steps to avoid Spark OOM: 1. Increase JVM memory by adding export SPARK_JAVA_OPTS=-Xmx2g 2. Use .persist(storage.StorageLevel.MEMORY_AND_DISK) instead of .cache() 3. Have you set spark.executor.memory value? It's 512m by default. 4. Add more memory to the workers. I haven't tried to modify the spark.shuffle.memoryFraction value. But it is said it's a threshold for contents being spilled to disk or not. I think you may decrease this value to mitigate pressure on memory. Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com On Sun, May 25, 2014 at 4:15 PM, Zuhair Khayyat zuhair.khay...@gmail.comwrote: Dear all, I am getting a OutOfMemoryError in class ByteString.java from package com.google.protobuf when processing very large data using spark 0.9. Does increasing spark.shuffle.memoryFraction helps or I should add more memory to my workers? Below the error I get during execution. 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud21:47985 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud5:46977 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud14:51948 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud12:45368 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud9:50926 14/05/25 07:26:05 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@cloud10:50690 14/05/25 07:26:12 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-5] shutting down ActorSystem [spark] java.lang.OutOfMemoryError: Java heap space at com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:90) at com.google.protobuf_spark.ByteString.copyFrom(ByteString.java:99) at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36) at akka.remote.EndpointWriter$$anonfun$akka$remote$EndpointWriter$$serializeMessage$1.apply(Endpoint.scala:672) at akka.remote.EndpointWriter$$anonfun$akka$remote$EndpointWriter$$serializeMessage$1.apply(Endpoint.scala:672) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.remote.EndpointWriter.akka$remote$EndpointWriter$$serializeMessage(Endpoint.scala:671) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:559) at akka.remote.EndpointWriter$$anonfun$7.applyOrElse(Endpoint.scala:544) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at akka.actor.FSM$class.processEvent(FSM.scala:595) at akka.remote.EndpointWriter.processEvent(Endpoint.scala:443) at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:589) at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:583) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:385) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Thank you,
For performance, Spark prefers OracleJDK or OpenJDK?
Hi, Oracle JDK and OpenJDK, which one is better or preferred for Spark? Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com
java.lang.NoClassDefFoundError: org/apache/spark/deploy/worker/Worker
Hi, all *Spark version: bae07e3 [behind 1] fix different versions of commons-lang dependency and apache/spark#746 addendum* I have six worker nodes and four of them have this NoClassDefFoundError when I use thestart-slaves.sh on my driver node. However, running ./bin/spark-class org.apache.spark.deploy.worker.Worker spark://MASTER_IP:PORT on the worker nodes works well. I compile the /spark directory on driver node and distribute to all the worker nodes. Paths on different nodes are identical. Here is the logs from one of four driver nodes. Spark Command: java -cp ::/home/wanghao/spark/conf:/home/wanghao/spark/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://192.168.1.12:7077 --webui-port 8081 Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/deploy/worker/Worker Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.worker.Worker at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:323) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:268) Could not find the main class: org.apache.spark.deploy.worker.Worker. Program will exit. Here is spark-env.sh export SPARK_WORKER_MEMORY=1g export SPARK_MASTER_IP=192.168.1.12 export SPARK_MASTER_PORT=7077 export SPARK_WORKER_CORES=1 export SPARK_WORKER_INSTANCES=2 hosts file: 127.0.0.1 localhost 192.168.1.12sing12 # The following lines are desirable for IPv6 capable hosts ::1 ip6-localhost ip6-loopback fe00::0 ip6-localnet ff00::0 ip6-mcastprefix ff02::1 ip6-allnodes ff02::2 ip6-allrouters 192.168.1.11 sing11 192.168.1.59 sing59 ### # failed machines ### 192.168.1.122 host122 192.168.1.123 host123 192.168.1.124 host124 192.168.1.125 host125 Regards, Wang Hao(王灏) CloudTeam | School of Software Engineering Shanghai Jiao Tong University Address:800 Dongchuan Road, Minhang District, Shanghai, 200240 Email:wh.s...@gmail.com