RE: Spark streaming on standalone cluster
Spark streaming needs at least two threads on the worker/slave side. I have seen this issue when(to test the behavior), I set the thread count for spark streaming to 1. It should be atleast 2: one for the receiver adapter(kafka, flume etc) and the second for processing the data. But I tested that in local mode: “--master local[2] “. The same issue could happen in worker also. If you set “--master local[1] “ the streaming worker/slave blocks due to starvation. Which conf parameter sets the worker thread count in cluster mode ? is it spark.akka.threads ? From: Tathagata Das [mailto:t...@databricks.com] Sent: 01 July 2015 01:32 To: Borja Garrido Bear Cc: user Subject: Re: Spark streaming on standalone cluster How many receivers do you have in the streaming program? You have to have more numbers of core in reserver by your spar application than the number of receivers. That would explain the receiving output after stopping. TD On Tue, Jun 30, 2015 at 7:59 AM, Borja Garrido Bear kazebo...@gmail.commailto:kazebo...@gmail.com wrote: Hi all, I'm running a spark standalone cluster with one master and one slave (different machines and both in version 1.4.0), the thing is I have a spark streaming job that gets data from Kafka, and the just prints it. To configure the cluster I just started the master and then the slaves pointing to it, as everything appears in the web interface I assumed everything was fine, but maybe I missed some configuration. When I run it locally there is no problem, it works. When I run it in the cluster the worker state appears as loading - If the job is a Scala one, when I stop it I receive all the output - If the job is Python, when I stop it I receive a bunch of these exceptions \\\ ERROR JobScheduler: Error running job streaming job 143567542 ms.0 py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: null at py4j.Protocol.getReturnValue(Protocol.java:417) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:113) at com.sun.proxy.$Proxy14.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:63) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) \\\ Is there any known issue with spark streaming and the standalone mode? or with Python? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments. WARNING: Computer viruses can be transmitted via email. The recipient should check this email and any attachments for the presence of viruses. The company accepts no liability for any damage caused by any virus transmitted by this email. www.wipro.com
Re: import errors with Eclipse Scala
in eclipse you can just add the spark assembly jar to the build path, right click the project build path configure build path library add external jars On Wed, Jul 1, 2015 at 7:15 PM Stefan Panayotov spanayo...@msn.com wrote: Hi Ted, How can I import the relevant Spark projects into Eclipse? Do I need to add anything the Java Build Path in the project properties? Also, I have installed sbt on my machine. Is there a corresponding sbt command to the maven command below? *Stefan Panayotov, PhD **Home*: 610-355-0919 *Cell*: 610-517-5586 *email*: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net -- Date: Wed, 1 Jul 2015 10:04:23 -0700 Subject: Re: import errors with Eclipse Scala From: yuzhih...@gmail.com To: spanayo...@msn.com CC: user@spark.apache.org Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.sql.hive.HiveContext *import* *org.apache*.spark.sql._ *import* *org.json4s*._ *import* *org.json4s*.JsonDSL._ *import* *org.json4s*.jackson.JsonMethods *import* *org.json4s*.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, *Stefan Panayotov, PhD **email*: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Re: import errors with Eclipse Scala
Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: *import* *org.apache*.spark.SparkConf *import* *org.apache*.spark.SparkContext *import* *org.apache*.spark.sql.hive.HiveContext *import* *org.apache*.spark.sql._ *import* *org.json4s*._ *import* *org.json4s*.JsonDSL._ *import* *org.json4s*.jackson.JsonMethods *import* *org.json4s*.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, *Stefan Panayotov, PhD **email*: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Task InputSize source code location
Hi, When running tasks, I found some task has input size of zero, while others not. For example, in this picture: http://snag.gy/g6iJX.jpg I suspect it has something to do with the block manager. But where is the exact source code that monitors the task input size? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
making dataframe for different types using spark-csv
Hi experts! I am using spark-csv to lead csv data into dataframe. By default it makes type of each column as string. Is there some way to get dataframe of actual types like int,double etc.? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/making-dataframe-for-different-types-using-spark-csv-tp23570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Standalone Cluster - Slave not connecting to Master
I am trying to setup a Spark standalone cluster following the official documentation. My master is on a local vm running ubuntu and I also have one worker running in the same machine. It is connecting and I am able to see its status in the WebUI of the master. But when I try to connect a slave from another machine, I am not able to do it. This is the log message I get in the worker when I start from another machine. I have tried using start-slaves.sh from the master after updating conf\slaves and also start-slave.sh spark://spark:7077 from the slave - 15/07/01 11:54:16 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkMaster@spark:7077] has failed, address is now gated for [5000] ms. Reason is: [Association failed with [akka.tcp://sparkMaster@spark:7077]]. 15/07/01 11:54:59 ERROR Worker: All masters are unresponsive! Giving up. 15/07/01 11:54:59 INFO Utils: Shutdown hook called When I try to telnet from the slave to the master, this is what I get - root@worker:~# telnet spark 7077 Trying 10.xx.xx.xx... Connected to spark. Escape character is '^]'. Connection closed by foreign host. Telnet seems to work but the connection is closed as soon as it is established. Could this have something to do with the problem ? I have added the master and slave IP addresses in /etc/hosts on both machines. I have the following config set in spark-env.sh in both machines - export SPARK_MASTER_IP=spark export SPARK_WORKER_PORT=4 I am not able to find out the reason for the slave not getting connected. Is there some configuration that I am missing ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Standalone-Cluster-Slave-not-connecting-to-Master-tp23572.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: import errors with Eclipse Scala
Thanks, Jem. I added scala-compiler.jar from C:\Eclipse\eclipse\plugins\org.scala-ide.scala210.jars_4.1.0.201505250838\target\jars And looks like this resolved the issue. Thanks once again. Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net From: jem.tuc...@gmail.com Date: Wed, 1 Jul 2015 18:20:51 + Subject: Re: import errors with Eclipse Scala To: spanayo...@msn.com; yuzhih...@gmail.com CC: user@spark.apache.org in eclipse you can just add the spark assembly jar to the build path, right click the project build path configure build path library add external jars On Wed, Jul 1, 2015 at 7:15 PM Stefan Panayotov spanayo...@msn.com wrote: Hi Ted, How can I import the relevant Spark projects into Eclipse? Do I need to add anything the Java Build Path in the project properties? Also, I have installed sbt on my machine. Is there a corresponding sbt command to the maven command below? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net Date: Wed, 1 Jul 2015 10:04:23 -0700 Subject: Re: import errors with Eclipse Scala From: yuzhih...@gmail.com To: spanayo...@msn.com CC: user@spark.apache.org Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql._ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: import errors with Eclipse Scala
Hi Ted, How can I import the relevant Spark projects into Eclipse? Do I need to add anything the Java Build Path in the project properties? Also, I have installed sbt on my machine. Is there a corresponding sbt command to the maven command below? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net Date: Wed, 1 Jul 2015 10:04:23 -0700 Subject: Re: import errors with Eclipse Scala From: yuzhih...@gmail.com To: spanayo...@msn.com CC: user@spark.apache.org Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql._ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
BroadCast Multiple DataFrame ( JDBC Tables )
Hi , I need to load 10 tables in memory and have them available to all the workers , Please let me me know what is the best way to do broadcast them sc.broadcast(df) allow only one Thanks,
Re: coalesce on dataFrame
PySpark or Spark (scala) ? When you use coalesce with anything but a column you must use a literal like that in PySpark : from pyspark.sql import functions as F F.coalesce(df.a, F.lit(True)) Le mer. 1 juil. 2015 à 12:03, Ewan Leith ewan.le...@realitymine.com a écrit : It's in spark 1.4.0, or should be at least: https://issues.apache.org/jira/browse/SPARK-6972 Ewan -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: 01 July 2015 08:23 To: user@spark.apache.org Subject: coalesce on dataFrame How can we use coalesce(1, true) on dataFrame? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
custom RDD in java
Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
Re: Can a Spark Driver Program be a REST Service by itself?
You can try using Spark Jobserver https://github.com/spark-jobserver/spark-jobserver On Wed, Jul 1, 2015 at 4:32 PM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, My Use case is as follows: My Driver program will be aggregating a bunch of Event Streams and acting on it. The Action on the aggregated events is configurable and can change dynamically. One way I can think of is to run the Spark Driver as a Service where a config push can be caught via an API that the Driver exports. Can I have a Spark Driver Program run as a REST Service by itself? Is this a common use case? Is there a better way to solve my problem? Thanks -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Making Unpersist Lazy
Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem
Passing name of package in sparkR.init()
Hi, What is the right way to pass package name in sparkR.init() ? I can successfully pass the package name if I'm using sparkR shell by using --package while invoking sparkR. However, if I'm trying to use sparkR from RStudio and neeed to pass a package name in sparkR.init() not sure how to do that. Regards, Sourav I
Illegal access error when initializing SparkConf
Team, I'm just playing around with spark and mllib. Installed scala and spark, versions mentioned below. Scala - 2.11.7 Spark - 1.4.0 (Did an mvn package with -Dscala-2.11) I'm trying to run the Java classification, clustering examples that came along with the documentation. However, I'm getting the illegal access error when I'm trying to initialize the SparkConf object. Please find the error trace below : *Exception in thread main java.lang.IllegalAccessError: tried to access method scala.collection.mutable.HashSet.init()V from class org.apache.spark.util.Utils$* * at org.apache.spark.util.Utils$.init(Utils.scala:195)* * at org.apache.spark.util.Utils$.clinit(Utils.scala)* * at org.apache.spark.SparkConf.init(SparkConf.scala:58)* * at MultinomialLogisticRegressionExample.main(MultinomialLogisticRegressionExample.java:15)* How do I go about this? Did some googling and couldn't conclude. Please help and thank you in advance. -- With Thanks and Regards, Ramprakash Ramamoorthy, India.
Re: Spark streaming on standalone cluster
Hi, https://spark.apache.org/docs/latest/streaming-programming-guide.html Points to remember - When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[*n*]” as the master URL where *n* number of receivers to run (see Spark Properties https://spark.apache.org/docs/latest/configuration.html#spark-properties.html for information on how to set the master). śr., 1.07.2015 o 11:25 użytkownik Borja Garrido Bear kazebo...@gmail.com napisał: Hi all, Thanks for the answers, yes, my problem was I was using just one worker with one core, so it was starving and then I never get the job to run, now it seems it's working properly. One question, is this information in the docs? (because maybe I misread it) On Wed, Jul 1, 2015 at 10:30 AM, prajod.vettiyat...@wipro.com wrote: Spark streaming needs at least two threads on the worker/slave side. I have seen this issue when(to test the behavior), I set the thread count for spark streaming to 1. It should be atleast 2: one for the receiver adapter(kafka, flume etc) and the second for processing the data. But I tested that in local mode: “--master local[2] “. The same issue could happen in worker also. If you set “--master local[1] “ the streaming worker/slave blocks due to starvation. Which conf parameter sets the worker thread count in cluster mode ? is it spark.akka.threads ? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* 01 July 2015 01:32 *To:* Borja Garrido Bear *Cc:* user *Subject:* Re: Spark streaming on standalone cluster How many receivers do you have in the streaming program? You have to have more numbers of core in reserver by your spar application than the number of receivers. That would explain the receiving output after stopping. TD On Tue, Jun 30, 2015 at 7:59 AM, Borja Garrido Bear kazebo...@gmail.com wrote: Hi all, I'm running a spark standalone cluster with one master and one slave (different machines and both in version 1.4.0), the thing is I have a spark streaming job that gets data from Kafka, and the just prints it. To configure the cluster I just started the master and then the slaves pointing to it, as everything appears in the web interface I assumed everything was fine, but maybe I missed some configuration. When I run it locally there is no problem, it works. When I run it in the cluster the worker state appears as loading - If the job is a Scala one, when I stop it I receive all the output - If the job is Python, when I stop it I receive a bunch of these exceptions \\\ ERROR JobScheduler: Error running job streaming job 143567542 ms.0 py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: null at py4j.Protocol.getReturnValue(Protocol.java:417) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:113) at com.sun.proxy.$Proxy14.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:63) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at
Re: Difference between spark-defaults.conf and SparkConf.set
Thanks. Without spark submit it seems the more straightforward solution is to just pass it on the driver's classpath. I was more surprised that the same conf parameter had different behavior depending on where it's specified. Program vs spark-defaults. Im all set now- thanks for replying div Original message /divdivFrom: Akhil Das ak...@sigmoidanalytics.com /divdivDate:07/01/2015 2:27 AM (GMT-05:00) /divdivTo: Yana Kadiyska yana.kadiy...@gmail.com /divdivCc: user@spark.apache.org /divdivSubject: Re: Difference between spark-defaults.conf and SparkConf.set /divdiv /div.addJar works for me when i run it as a stand-alone application (without using spark-submit) Thanks Best Regards On Tue, Jun 30, 2015 at 7:47 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, running into a pretty strange issue: I'm setting spark.executor.extraClassPath spark.driver.extraClassPath to point to some external JARs. If I set them in spark-defaults.conf everything works perfectly. However, if I remove spark-defaults.conf and just create a SparkConf and call .set(spark.executor.extraClassPath,...) .set(spark.driver.extraClassPath,...) I get ClassNotFound exceptions from Hadoop Conf: Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.ceph.CephFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1493) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1585) This seems like a bug to me -- or does spark-defaults.conf somehow get processed differently? I have dumped out sparkConf.toDebugString and in both cases (spark-defaults.conf/in code sets) it seems to have the same values in it...
Re: Spark streaming on standalone cluster
Hi all, Thanks for the answers, yes, my problem was I was using just one worker with one core, so it was starving and then I never get the job to run, now it seems it's working properly. One question, is this information in the docs? (because maybe I misread it) On Wed, Jul 1, 2015 at 10:30 AM, prajod.vettiyat...@wipro.com wrote: Spark streaming needs at least two threads on the worker/slave side. I have seen this issue when(to test the behavior), I set the thread count for spark streaming to 1. It should be atleast 2: one for the receiver adapter(kafka, flume etc) and the second for processing the data. But I tested that in local mode: “--master local[2] “. The same issue could happen in worker also. If you set “--master local[1] “ the streaming worker/slave blocks due to starvation. Which conf parameter sets the worker thread count in cluster mode ? is it spark.akka.threads ? *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* 01 July 2015 01:32 *To:* Borja Garrido Bear *Cc:* user *Subject:* Re: Spark streaming on standalone cluster How many receivers do you have in the streaming program? You have to have more numbers of core in reserver by your spar application than the number of receivers. That would explain the receiving output after stopping. TD On Tue, Jun 30, 2015 at 7:59 AM, Borja Garrido Bear kazebo...@gmail.com wrote: Hi all, I'm running a spark standalone cluster with one master and one slave (different machines and both in version 1.4.0), the thing is I have a spark streaming job that gets data from Kafka, and the just prints it. To configure the cluster I just started the master and then the slaves pointing to it, as everything appears in the web interface I assumed everything was fine, but maybe I missed some configuration. When I run it locally there is no problem, it works. When I run it in the cluster the worker state appears as loading - If the job is a Scala one, when I stop it I receive all the output - If the job is Python, when I stop it I receive a bunch of these exceptions \\\ ERROR JobScheduler: Error running job streaming job 143567542 ms.0 py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: null at py4j.Protocol.getReturnValue(Protocol.java:417) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:113) at com.sun.proxy.$Proxy14.call(Unknown Source) at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:63) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:156) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:193) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:192) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) \\\ Is there any known issue with spark streaming and the standalone mode? or with Python? The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail.
Can a Spark Driver Program be a REST Service by itself?
Folks, My Use case is as follows: My Driver program will be aggregating a bunch of Event Streams and acting on it. The Action on the aggregated events is configurable and can change dynamically. One way I can think of is to run the Spark Driver as a Service where a config push can be caught via an API that the Driver exports.Can I have a Spark Driver Program run as a REST Service by itself? Is this a common use case? Is there a better way to solve my problem? Thanks
Re: Issue with parquet write after join (Spark 1.4.0)
Join is happening successfully as I am able to do count() after the join. Error is coming only while trying to write in parquet format on hdfs. Thanks, Pooja. On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It says: Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 Could you look in the executor logs (stderr on slave2) and see what made it shut down? Since you are doing a join there's a high possibility of OOM etc. Thanks Best Regards On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com wrote: Hi, We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition. But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out. Code flow: // join two data frames - dfBase and dfIncr on primaryKey val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), outer) // applying a reduce function on each row. val mergedDF = joinedDF.map(x = reduceFunc(x) ) //converting back to dataframe val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) //writing to parquet file newdf.write.parquet(hdfsfilepath) Getting following exception: 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 24 ms 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost) 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845) 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s). 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false) 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by:
RE: coalesce on dataFrame
It's in spark 1.4.0, or should be at least: https://issues.apache.org/jira/browse/SPARK-6972 Ewan -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: 01 July 2015 08:23 To: user@spark.apache.org Subject: coalesce on dataFrame How can we use coalesce(1, true) on dataFrame? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Json Dataframe formation and Querying
Hi, I am creating DataFrame from a json file and the schema of json as truely depicted by dataframe.printschema() is: root |-- 1-F2: struct (nullable = true) ||-- A: string (nullable = true) ||-- B: string (nullable = true) ||-- C: string (nullable = true) |-- 10-C4: struct (nullable = true) ||-- A: string (nullable = true) ||-- D: string (nullable = true) ||-- E: string (nullable = true) |-- 11-B5: struct (nullable = true) ||-- A: string (nullable = true) ||-- D: string (nullable = true) ||-- F: string (nullable = true) ||-- G: string (nullable = true) In the above schema ; struct type elements {1-F2 ; 10-C4; 11-B5 } are dynamic. These kind of dynamic schema can be easily parsed by any parser (e.g. gson, jackson) and Map type structure makes it easy to query back and transform but in Spark 1.4 how should I query back using construct like : dataframe.select([0]).show() -- Index based query I tried to save it as Table and then tried to describe it back using spark-sql repl but it is unable to find my table. What is the preferred way to deal with this type of use case in Spark? Regards, Umesh Chaudhary This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
Re: s3 bucket access/read file
s3a uses amazon's own libraries; it's tested against frankfurt too. you have to view s3a support in Hadoop 2.6 as beta-release: it works, with some issues. Hadoop 2.7.0+ has it all working now, though are left with the task of getting hadoop-aws and the amazon JAR onto your classpath via the --jars option, as they aren't in the spark-assembly JAR On 1 Jul 2015, at 04:46, Aaron Davidson ilike...@gmail.commailto:ilike...@gmail.com wrote: Should be able to use s3a (on new hadoop versions), I believe that will try or at least has a setting for v4 On Tue, Jun 30, 2015 at 8:31 PM, Exie tfind...@prodevelop.com.aumailto:tfind...@prodevelop.com.au wrote: Not sure if this helps, but the options I set are slightly different: val hadoopConf=sc.hadoopConfiguration hadoopConf.set(fs.s3n.awsAccessKeyId,key) hadoopConf.set(fs.s3n.awsSecretAccessKey,secret) Try setting them to s3n as opposed to just s3 Good luck! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/s3-bucket-access-read-file-tp23536p23560.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Issues when saving dataframe in Spark 1.4 with parquet format
Hi chaps, It seems there is an issue while saving dataframes in Spark 1.4. The default file extension inside Hive warehouse folder is now part-r-X.gz.parquet but while running queries from SparkSQL Thriftserver is still looking for part-r-X.parquet. Is there any config parameter we can use as workaround? Is there any Jira opened about the same? Am I missing anything if I upgraded from Spark 1.3 to 1.4? The only similar reference I have seen is this: http://mail-archives.apache.org/mod_mbox/spark-user/201506.mbox/%3ccahp0wa+japfvj+pc2mzwomzb+mmdozfbr-xaxdbkoppe68t...@mail.gmail.com%3E Thanks.
Re: Check for null in PySpark DataFrame
I must admit I've been using the same back to SQL strategy for now :p So I'd be glad to have insights into that too. Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit : I am trying to find what is the correct way to programmatically check for null values for rows in a dataframe. For example, below is the code using pyspark and sql: df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3, b), (4, None)])) df.where('_2 is not null').count() However, this won't work df.where(df._2 != None).count() It seems there is no native Python way with DataFrames to do this, but I find that difficult to believe and more likely that I am missing the right way to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StorageLevel.MEMORY_AND_DISK_SER
So do you want to change the behavior of persist api or write the rdd on disk... On Jul 1, 2015 9:13 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I think i want to use persist then and write my intermediate RDDs to disk+mem. On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think persist api is internal to rdd whereas write api is for saving content on dist. Rdd persist will dump your obj bytes serialized on the disk.. If you wanna change that behavior you need to override the class serialization that your are storing in rdd.. On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is my write API. how do i integrate it here. protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) val outputRecords = detailRecords.coalesce(100) outputRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote: rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak -- Deepak -- Deepak
Re: Issue with parquet write after join (Spark 1.4.0)
By any chance, are you using time field in your df. Time fields are known to be notorious in rdd conversion. On Jul 1, 2015 6:13 PM, Pooja Jain pooja.ja...@gmail.com wrote: Join is happening successfully as I am able to do count() after the join. Error is coming only while trying to write in parquet format on hdfs. Thanks, Pooja. On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It says: Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 Could you look in the executor logs (stderr on slave2) and see what made it shut down? Since you are doing a join there's a high possibility of OOM etc. Thanks Best Regards On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com wrote: Hi, We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition. But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out. Code flow: // join two data frames - dfBase and dfIncr on primaryKey val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), outer) // applying a reduce function on each row. val mergedDF = joinedDF.map(x = reduceFunc(x) ) //converting back to dataframe val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) //writing to parquet file newdf.write.parquet(hdfsfilepath) Getting following exception: 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 24 ms 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost) 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845) 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s). 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false) 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at
Re: StorageLevel.MEMORY_AND_DISK_SER
I think i want to use persist then and write my intermediate RDDs to disk+mem. On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think persist api is internal to rdd whereas write api is for saving content on dist. Rdd persist will dump your obj bytes serialized on the disk.. If you wanna change that behavior you need to override the class serialization that your are storing in rdd.. On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is my write API. how do i integrate it here. protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) val outputRecords = detailRecords.coalesce(100) outputRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote: rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak -- Deepak -- Deepak
Re: custom RDD in java
If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
BroadcastHashJoin when RDD is not cached
Hello, I have a straight forward use case of joining a large table with a smaller table. The small table is within the limit I set for spark.sql.autoBroadcastJoinThreshold. I notice that ShuffledHashJoin is used to perform the join. BroadcastHashJoin was used only when I pre-fetched and cached the small table. I understand that for typical broadcast we would have to read and collect() the small table in driver before broadcasting. Why not do this automatically for joins? That way stage1(read large table) and stage2(read small table) can still be run in parallel. Sort [emailId#19 ASC,date#0 ASC], true Exchange (RangePartitioning 24) Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] Filter ((lowerTime#22 = date#0) (date#0 = upperTime#23)) *ShuffledHashJoin* [ip#7], [ip#18], BuildRight Exchange (HashPartitioning 24) Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] PhysicalRDD [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 Exchange (HashPartitioning 24) Project [emailId#19,scalaUDF(date#20) AS upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth
Re: StorageLevel.MEMORY_AND_DISK_SER
For that you need to change the serialize and deserialize behavior of your class. Preferably, you can use Kyro serializers n override the behavior. For details u can look https://github.com/EsotericSoftware/kryo/blob/master/README.md On Jul 1, 2015 9:26 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: i original assumed that persisting is similar to writing. But its not. Hence i want to change the behavior of intermediate persists. On Wed, Jul 1, 2015 at 8:46 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: So do you want to change the behavior of persist api or write the rdd on disk... On Jul 1, 2015 9:13 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I think i want to use persist then and write my intermediate RDDs to disk+mem. On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think persist api is internal to rdd whereas write api is for saving content on dist. Rdd persist will dump your obj bytes serialized on the disk.. If you wanna change that behavior you need to override the class serialization that your are storing in rdd.. On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is my write API. how do i integrate it here. protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) val outputRecords = detailRecords.coalesce(100) outputRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote: rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak -- Deepak -- Deepak -- Deepak
Re: StorageLevel.MEMORY_AND_DISK_SER
i original assumed that persisting is similar to writing. But its not. Hence i want to change the behavior of intermediate persists. On Wed, Jul 1, 2015 at 8:46 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: So do you want to change the behavior of persist api or write the rdd on disk... On Jul 1, 2015 9:13 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I think i want to use persist then and write my intermediate RDDs to disk+mem. On Wed, Jul 1, 2015 at 8:28 AM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: I think persist api is internal to rdd whereas write api is for saving content on dist. Rdd persist will dump your obj bytes serialized on the disk.. If you wanna change that behavior you need to override the class serialization that your are storing in rdd.. On Jul 1, 2015 8:50 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: This is my write API. how do i integrate it here. protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) val outputRecords = detailRecords.coalesce(100) outputRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote: rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak -- Deepak -- Deepak -- Deepak
StorageLevel.MEMORY_AND_DISK_SER
How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak
Re: StorageLevel.MEMORY_AND_DISK_SER
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak
Re: StorageLevel.MEMORY_AND_DISK_SER
This is my write API. how do i integrate it here. protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) val outputRecords = detailRecords.coalesce(100) outputRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } On Wed, Jul 1, 2015 at 8:11 AM, Koert Kuipers ko...@tresata.com wrote: rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) On Wed, Jul 1, 2015 at 11:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: How do i persist an RDD using StorageLevel.MEMORY_AND_DISK_SER ? -- Deepak -- Deepak
Re: Can a Spark Driver Program be a REST Service by itself?
I am using spark driver as a rest service. I used spray.io to make my app rest server. I think this is a good design for applications that you want to keep in long running mode.. On Jul 1, 2015 6:28 PM, Arush Kharbanda ar...@sigmoidanalytics.com wrote: You can try using Spark Jobserver https://github.com/spark-jobserver/spark-jobserver On Wed, Jul 1, 2015 at 4:32 PM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Folks, My Use case is as follows: My Driver program will be aggregating a bunch of Event Streams and acting on it. The Action on the aggregated events is configurable and can change dynamically. One way I can think of is to run the Spark Driver as a Service where a config push can be caught via an API that the Driver exports. Can I have a Spark Driver Program run as a REST Service by itself? Is this a common use case? Is there a better way to solve my problem? Thanks -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: s3 bucket access/read file
I think 2.6 failed to abruptly close streams that weren't fully read, which we observed as a huge performance hit. We had to backport the 2.7 improvements before being able to use it.
binaryFiles() for 1 million files, too much memory required
Once again I am trying to read a directory tree using binary files. My directory tree has a root dir ROOTDIR and subdirs where the files are located, i.e. ROOTDIR/1 ROOTDIR/2 ROOTDIR/.. ROOTDIR/100 A total of 1 mil files split into 100 sub dirs Using binaryFiles requires too much memory on the driver. I've also tried rdds of binaryFiles(each subdir) and then ++ those and rdd.saveAsObjectFile(outputDir). That causes a lot of memory to be required in the executors! What is the proper way to use binaryFiles with this number of files? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Dataframe 1.4 (GroupBy partial match)
You should probably write a UDF that uses regular expression or other string munging to canonicalize the subject and then group on that derived column. On Tue, Jun 30, 2015 at 10:30 PM, Suraj Shetiya surajshet...@gmail.com wrote: Thanks Salih. :) The output of the groupby is as below. 2015-01-14 SEC Inquiry 2015-01-16 Re: SEC Inquiry 2015-01-18 Fwd: Re: SEC Inquiry And subsequently, we would like to aggregate all messages with a particular reference subject. For instance the question we are trying to answer could be : Get the count of messages with a particular subject. Looking forward to any suggestion from you. On Tue, Jun 30, 2015 at 8:42 PM, Salih Oztop soz...@yahoo.com wrote: Hi Suraj What will be your output after group by? Since GroupBy is for aggregations like sum, count etc. If you want to count the 2015 records than it is possible. Kind Regards Salih Oztop -- *From:* Suraj Shetiya surajshet...@gmail.com *To:* user@spark.apache.org *Sent:* Tuesday, June 30, 2015 3:05 PM *Subject:* Spark Dataframe 1.4 (GroupBy partial match) I have a dataset (trimmed and simplified) with 2 columns as below. DateSubject 2015-01-14 SEC Inquiry 2014-02-12 Happy birthday 2014-02-13 Re: Happy birthday 2015-01-16 Re: SEC Inquiry 2015-01-18 Fwd: Re: SEC Inquiry I have imported the same in a Spark Dataframe. What I am looking at is groupBy subject field (however, I need a partial match to identify the discussion topic). For example in the above case.. I would like to group all messages, which have subject containing SEC Inquiry which returns following grouped frame: 2015-01-14 SEC Inquiry 2015-01-16 Re: SEC Inquiry 2015-01-18 Fwd: Re: SEC Inquiry Another usecase for a similar problem could be group by year (in the above example), it would mean partial match of the date field, which would mean groupBy Date by matching year as 2014 or 2015. Keenly Looking forward to reply/solution to the above. - Suraj
Re: custom RDD in java
List of tables is not large , RDD is created on table list to parllelise the work of fetching tables in multiple mappers at same time.Since time taken to fetch a table is significant , so can't run that sequentially. Content of table fetched by a map job is large, so one option is to dump content to hdfs using filesystem api from inside map function for every few rows of table fetched. I cannot keep complete table in memory and then dump in hdfs using below map function- JavaRDDString tablecontent = tablelistrdd.map(new FunctionString,IterableString) {public IterableString call(String tablename){ ..make jdbc connection get table data and populate in list and return that.. } tablecontent .saveAsTextFile(hdfspath); Here I wanted to create customRDD- whose partitions would be in memory on multiple executors and contains parts of table data. And i would have called saveAsTextFile on customRDD directly to save in hdfs. On Thu, Jul 2, 2015 at 12:59 AM, Feynman Liang fli...@databricks.com wrote: On Wed, Jul 1, 2015 at 7:19 AM, Shushant Arora shushantaror...@gmail.com wrote: JavaRDDString rdd = javasparkcontext.parllelise(tables); You are already creating an RDD in Java here ;) However, it's not clear to me why you'd want to make this an RDD. Is the list of tables so large that it doesn't fit on a single machine? If not, you may be better off spinning up one spark job for dumping each table in tables using a JDBC datasource https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases . On Wed, Jul 1, 2015 at 12:00 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Sure, you can create custom RDDs. Haven’t done so in Java, but in Scala absolutely. From: Shushant Arora Date: Wednesday, July 1, 2015 at 1:44 PM To: Silvio Fiorito Cc: user Subject: Re: custom RDD in java ok..will evaluate these options but is it possible to create RDD in java? On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
Re: custom RDD in java
Sure, you can create custom RDDs. Haven’t done so in Java, but in Scala absolutely. From: Shushant Arora Date: Wednesday, July 1, 2015 at 1:44 PM To: Silvio Fiorito Cc: user Subject: Re: custom RDD in java ok..will evaluate these options but is it possible to create RDD in java? On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
Re: Issue with parquet write after join (Spark 1.4.0)
I would still look at your executor logs. A count() is rewritten by the optimizer to be much more efficient because you don't actually need any of the columns. Also, writing parquet allocates quite a few large buffers. On Wed, Jul 1, 2015 at 5:42 AM, Pooja Jain pooja.ja...@gmail.com wrote: Join is happening successfully as I am able to do count() after the join. Error is coming only while trying to write in parquet format on hdfs. Thanks, Pooja. On Wed, Jul 1, 2015 at 1:06 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It says: Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 Could you look in the executor logs (stderr on slave2) and see what made it shut down? Since you are doing a join there's a high possibility of OOM etc. Thanks Best Regards On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com wrote: Hi, We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition. But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out. Code flow: // join two data frames - dfBase and dfIncr on primaryKey val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), outer) // applying a reduce function on each row. val mergedDF = joinedDF.map(x = reduceFunc(x) ) //converting back to dataframe val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) //writing to parquet file newdf.write.parquet(hdfsfilepath) Getting following exception: 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 24 ms 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost) 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845) 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s). 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false) 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at
Re: custom RDD in java
AFAIK RDDs can only be created on the driver, not the executors. Also, `saveAsTextFile(...)` is an action and hence can also only be executed on the driver. As Silvio already mentioned, Sqoop may be a good option. On Wed, Jul 1, 2015 at 12:46 PM, Shushant Arora shushantaror...@gmail.com wrote: List of tables is not large , RDD is created on table list to parllelise the work of fetching tables in multiple mappers at same time.Since time taken to fetch a table is significant , so can't run that sequentially. Content of table fetched by a map job is large, so one option is to dump content to hdfs using filesystem api from inside map function for every few rows of table fetched. I cannot keep complete table in memory and then dump in hdfs using below map function- JavaRDDString tablecontent = tablelistrdd.map(new FunctionString,IterableString) {public IterableString call(String tablename){ ..make jdbc connection get table data and populate in list and return that.. } tablecontent .saveAsTextFile(hdfspath); Here I wanted to create customRDD- whose partitions would be in memory on multiple executors and contains parts of table data. And i would have called saveAsTextFile on customRDD directly to save in hdfs. On Thu, Jul 2, 2015 at 12:59 AM, Feynman Liang fli...@databricks.com wrote: On Wed, Jul 1, 2015 at 7:19 AM, Shushant Arora shushantaror...@gmail.com wrote: JavaRDDString rdd = javasparkcontext.parllelise(tables); You are already creating an RDD in Java here ;) However, it's not clear to me why you'd want to make this an RDD. Is the list of tables so large that it doesn't fit on a single machine? If not, you may be better off spinning up one spark job for dumping each table in tables using a JDBC datasource https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases . On Wed, Jul 1, 2015 at 12:00 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Sure, you can create custom RDDs. Haven’t done so in Java, but in Scala absolutely. From: Shushant Arora Date: Wednesday, July 1, 2015 at 1:44 PM To: Silvio Fiorito Cc: user Subject: Re: custom RDD in java ok..will evaluate these options but is it possible to create RDD in java? On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
Re: Custom order by in Spark SQL
Easiest way to do this today is to define a UDF that maps from string to a number. On Wed, Jul 1, 2015 at 10:25 AM, Mick Davies michael.belldav...@gmail.com wrote: Hi, Is there a way to specify a custom order by (Ordering) on a column in Spark SQL In particular I would like to have the order by applied to a currency column not to be alpha, but something like - USD, EUR, JPY, GBP etc.. I saw an earlier post on UDTs and ordering (which I can't seem to find in this archive, http://mail-archives.us.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAFGcCdWWCFCwVp7+BCaPQ=6uupmyjcbhqyjn9txeu45hjg4...@mail.gmail.com%3E ), which is somewhat related to this question. Thanks Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-order-by-in-Spark-SQL-tp23569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Check for null in PySpark DataFrame
There is an isNotNull function on any column. df._1.isNotNull or from pyspark.sql.functions import * col(myColumn).isNotNull On Wed, Jul 1, 2015 at 3:07 AM, Olivier Girardot ssab...@gmail.com wrote: I must admit I've been using the same back to SQL strategy for now :p So I'd be glad to have insights into that too. Le mar. 30 juin 2015 à 23:28, pedro ski.rodrig...@gmail.com a écrit : I am trying to find what is the correct way to programmatically check for null values for rows in a dataframe. For example, below is the code using pyspark and sql: df = sqlContext.createDataFrame(sc.parallelize([(1, None), (2, a), (3, b), (4, None)])) df.where('_2 is not null').count() However, this won't work df.where(df._2 != None).count() It seems there is no native Python way with DataFrames to do this, but I find that difficult to believe and more likely that I am missing the right way to do this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Check-for-null-in-PySpark-DataFrame-tp23553.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: BroadcastHashJoin when RDD is not cached
We don't know that the table is small unless you cache it. In Spark 1.5 you'll be able to give us a hint though ( https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L581 ) On Wed, Jul 1, 2015 at 8:30 AM, Srikanth srikanth...@gmail.com wrote: Hello, I have a straight forward use case of joining a large table with a smaller table. The small table is within the limit I set for spark.sql.autoBroadcastJoinThreshold. I notice that ShuffledHashJoin is used to perform the join. BroadcastHashJoin was used only when I pre-fetched and cached the small table. I understand that for typical broadcast we would have to read and collect() the small table in driver before broadcasting. Why not do this automatically for joins? That way stage1(read large table) and stage2(read small table) can still be run in parallel. Sort [emailId#19 ASC,date#0 ASC], true Exchange (RangePartitioning 24) Project [emailId#19,ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] Filter ((lowerTime#22 = date#0) (date#0 = upperTime#23)) *ShuffledHashJoin* [ip#7], [ip#18], BuildRight Exchange (HashPartitioning 24) Project [ip#7,date#0,siteName#1,uri#3,status#12,csbytes#16L] PhysicalRDD [date#0,siteName#1,method#2,uri#3,query#4,port#5,userName#6], MapPartitionsRDD[6] at rddToDataFrameHolder at DataSourceReader.scala:25 Exchange (HashPartitioning 24) Project [emailId#19,scalaUDF(date#20) AS upperTime#23,ip#18,scalaUDF(date#20) AS lowerTime#22] PhysicalRDD [ip#18,emailId#19,date#20], MapPartitionsRDD[12] at rddToDataFrameHolder at DataSourceReader.scala:41 Srikanth
Re: sparkR could not find function textFile
Thanks Shivram. Your suggestion in stack overflow regarding this did work. Thanks again. Regards, Sourav On Wed, Jul 1, 2015 at 10:21 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: You can check my comment below the answer at http://stackoverflow.com/a/30959388/4577954. BTW we added a new option to sparkR.init to pass in packages and that should be a part of 1.5 Shivaram On Wed, Jul 1, 2015 at 10:03 AM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Piggybacking on this discussion. I'm trying to achieve the same, reading a csv file, from RStudio. Where I'm stuck is how to supply some additional package from RStudio to spark.init() as sparkR.init does() not provide an option to specify additional package. I tried following codefrom RStudio. It is giving me error Error in callJMethod(sqlContext, load, source, options) : Invalid jobj 1. If SparkR was restarted, Spark operations need to be re-executed. -- Sys.setenv(SPARK_HOME=C:\\spark-1.4.0-bin-hadoop2.6) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib),.libPaths())) library(SparkR) sparkR.stop() sc - sparkR.init(master=local[2], sparkEnvir = list(spark.executor.memory=1G), sparkJars=C:\\spark-1.4.0-bin-hadoop2.6\\lib\\spark-csv_2.11-1.1.0.jar) /* I have downloaded this spark-csv jar and kept it in lib folder of Spark */ sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). -- However, I also tried this from shell as 'sparkR --package com.databricks:spark-csv_2.11:1.1.0. This time I used the following code and it works all fine. sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). Any idea how to achieve the same from RStudio ? Regards, On Thu, Jun 25, 2015 at 2:38 PM, Wei Zhou zhweisop...@gmail.com wrote: I tried out the solution using spark-csv package, and it worked fine now :) Thanks. Yes, I'm playing with a file with all columns as String, but the real data I want to process are all doubles. I'm just exploring what sparkR can do versus regular scala spark, as I am by heart a R person. 2015-06-25 14:26 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com : Sure, I had a similar question that Shivaram was able fast for me, the solution is implemented using a separate DataBrick’s library. Check out this thread from the email archives [1], and the read.df() command [2]. CSV files can be a bit tricky, especially with inferring their schemas. Are you using just strings as your column types right now? Alek [1] -- http://apache-spark-developers-list.1001551.n3.nabble.com/CSV-Support-in-SparkR-td12559.html [2] -- https://spark.apache.org/docs/latest/api/R/read.df.html From: Wei Zhou zhweisop...@gmail.com Date: Thursday, June 25, 2015 at 4:15 PM To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Cc: Aleksander Eskilson alek.eskil...@cerner.com, user@spark.apache.org user@spark.apache.org Subject: Re: sparkR could not find function textFile Thanks to both Shivaram and Alek. Then if I want to create DataFrame from comma separated flat files, what would you recommend me to do? One way I can think of is first reading the data as you would do in r, using read.table(), and then create spark DataFrame out of that R dataframe, but it is obviously not scalable. 2015-06-25 13:59 GMT-07:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu: The `head` function is not supported for the RRDD that is returned by `textFile`. You can run `take(lines, 5L)`. I should add a warning here that the RDD API in SparkR is private because we might not support it in the upcoming releases. So if you can use the DataFrame API for your application you should try that out. Thanks Shivaram On Thu, Jun 25, 2015 at 1:49 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi Alek, Just a follow up question. This is what I did in sparkR shell: lines - SparkR:::textFile(sc, ./README.md) head(lines) And I am getting error: Error in x[seq_len(n)] : object of type 'S4' is not subsettable I'm wondering what did I do wrong. Thanks in advance. Wei 2015-06-25 13:44 GMT-07:00 Wei Zhou zhweisop...@gmail.com: Hi Alek, Thanks for the explanation, it is very helpful. Cheers, Wei 2015-06-25 13:40 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com: Hi there, The tutorial you’re reading there was written before the merge of SparkR for Spark 1.4.0 For the merge, the RDD API (which includes the textFile() function) was made private, as the devs felt many of its functions were too low level. They focused instead on finishing the DataFrame API which supports local, HDFS, and Hive/HBase file reads. In the meantime, the devs are trying to determine which functions of the RDD API, if any, should be made
Use of Apache Spark with R package SNOW, or perhaps Hadoop YARN with same SNOW?
Pretty much as in the subject. Snow is an R package for doing mapping of computations onto processes in one or more servers that's simple to use, and requires little configuration. Organizations sometimes use Hadoop and Spark to manage large clusters of processors. Is there a way for snow to coexist with Spark? My personal experience is using snow's SOCK interface (as opposed to MPI or PVM or NWS). More on snow is available at http://homepage.stat.uiowa.edu/~luke/R/cluster/cluster.html -- Jan Galkowski Senior System Software Engineer, II. Akamai Technologies, Cambridge, MA Custom Analytics Research Development -- The magic of statistics cannot put actual numbers where there are none. -- Paul Holland --
Re: How to disable parquet schema merging in 1.4?
With Spark 1.4, you may use data source option mergeSchema to control it: sqlContext.read.option(mergeSchema, false).parquet(some/path) or CREATE TABLE t USING parquet OPTIONS (mergeSchema false, path some/path) We're considering to disable schema merging by default in 1.5.0 since it brings unnecessary performance cost when schema evolution is not a problem. Cheng On 6/23/15 2:20 AM, Rex Xiong wrote: I remember in a previous PR, schema merging can be disabled by setting spark.sql.hive.convertMetastoreParquet.mergeSchema to false. But in 1.4 release, I don't see this config anymore, is there a new way to do it? Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark driver using Spark Streaming shows increasing memory/CPU usage
[Apologies if the end of the last email was only included as an attachment - MacMail seems to do that with the rest of the message if an attachment appears inline. I‘m sending again for clarity.] Hi Tathagata, Thanks for your quick reply! I’ll add some more detail below about what I’m doing - I’ve tried a lot of variations on the code to debug this, with monitoring enabled, but I didn’t want to overwhelm the issue description to start with ;-) On 30 Jun 2015, at 19:30, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Could you give more information on the operations that you are using? The code outline? And what do you mean by Spark Driver receiver events? If the driver is receiving events, how is it being sent to the executors. The events are just objects that represent actions a user takes. They contain a user id, a type and some other info, and get dumped into a MongoDB and then picked out by the Receiver. This ReceiverBSONObject runs a thread which periodically polls the db, processes new events into DBObjects and calls Receiver.store() to hand each one off to an Executor. BTW, for memory usages, I strongly recommend using jmap --histo:live to see what are the type of objects that is causing most memory usage? I’ve been running both jconsole and VisualVM to monitor the processes, and when memory usage is high it is overwhelmingly due to byte arrays. I’ve read that sometimes performing operations like sorting an RDD can lead to unreachable byte arrays (https://spark-project.atlassian.net/browse/SPARK-1001). I’ve not come across any reports that quite match our use case though. The groupByKey step seems to be a significant creator of byte arrays in my case. I’ll attach an outline of the code I’m using - I’ve tried to reduce this to the essentials; it won’t compile but should display ok in an IDE. A note that our MongoDBReceiver class uses StorageLevel.MEMORY_AND_DISK_SER() which will spill partitions that don't fit in memory to disk… as serialized Java objects (one byte array per partition)”. I wondered if this might be a contributor to the problem, but our partitions are very small. Perhaps the partitions are not getting cleared up for some reason. Thanks again for taking this up. Spark Streaming has been very useful for us! Neil SparkDriverOutline.java Description: SparkDriverOutline.java TDOn Tue, Jun 30, 2015 at 9:48 AM, easyonthemayo neil.m...@velocityww.com wrote:I have a Spark program which exhibits increasing resource usage. Spark Streaming (https://spark.apache.org/streaming/) is used to provide the data source. The Spark Driver class receives "events" by querying a MongoDB in a custom JavaReceiverInputDStream. These events are then transformed via mapToPair(), which creates tuples mapping an id to each event. The stream is partitioned and we run a groupByKey(). Finally the events are processed by foreachRDD(). Running it for several hours on a standalone cluster, a clear trend emerges of both CPU and heap memory usage increasing. This occurs even if the data source offers no events, so there is no actual processing to perform. Similarly, omitting the bulk of processing code within foreachRDD() does not eliminate the problem. I've tried eliminating steps in the process to identify the culprit, and it looks like it's the partitioning step that prompts the CPU usage to increase over time. Has anyone else experienced this sort of behaviour? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Retrieve hadoop conf object from Python API
To close the loop. This should work: sc._jsc.hadoopConfiguration See this method in JavaSparkContext : def hadoopConfiguration(): Configuration = { sc.hadoopConfiguration On Tue, Jun 30, 2015 at 5:52 PM, Ted Yu yuzhih...@gmail.com wrote: Minor correction: It should be sc._jsc Cheers On Tue, Jun 30, 2015 at 4:23 PM, ayan guha guha.a...@gmail.com wrote: There is a sc._jsc_ which you can access to get/set hadoop conf. On Wed, Jul 1, 2015 at 7:41 AM, Richard Ding pigu...@gmail.com wrote: Hi, I noticed that, in Scala API, one can call hadoopConfiguration on SparkContext to retrieve the hadoop configuration object which is very handy in modifying certain hadoop properties at runtime. But there is no corresponding method in Python API. Will this method be added to Python API in a later release? In the mean time, what will be the work around (of setting hadoop properties at runtime) using Python API? Thanks, Richard -- Best Regards, Ayan Guha
Re: Subsecond queries possible?
If you take bitmap indices out of sybase then I am guessing spark sql will be at par with sybase ? On that note are there plans of integrating indexed rdd ideas to spark sql to build indices ? Is there a JIRA tracking it ? On Jun 30, 2015 7:29 PM, Eric Pederson eric...@gmail.com wrote: Hi Debasish: We have the same dataset running on SybaseIQ and after the caches are warm the queries come back in about 300ms. We're looking at options to relieve overutilization and to bring down licensing costs. I realize that Spark may not be the best fit for this use case but I'm interested to see how far it can be pushed. Thanks for your help! -- Eric On Tue, Jun 30, 2015 at 5:28 PM, Debasish Das debasish.da...@gmail.com wrote: I got good runtime improvement from hive partitioninp, caching the dataset and increasing the cores through repartition...I think for your case generating mysql style indexing will help further..it is not supported in spark sql yet... I know the dataset might be too big for 1 node mysql but do you have a runtime estimate from running the same query on mysql with appropriate column indexing ? That should give us a good baseline number... For my case at least I could not put the data on 1 node mysql as it was big... If you can write the problem in a document view you can use a document store like solr/elastisearch to boost runtime...the reverse indices can get you subsecond latencies...again the schema design matters for that and you might have to let go some of sql expressiveness (like balance in a predefined bucket might be fine but looking for the exact number might be slow)
Re: coalesce on dataFrame
You can use df.repartition(1) in Spark 1.4. See here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala#L1396 . Best, Burak On Wed, Jul 1, 2015 at 3:05 AM, Olivier Girardot ssab...@gmail.com wrote: PySpark or Spark (scala) ? When you use coalesce with anything but a column you must use a literal like that in PySpark : from pyspark.sql import functions as F F.coalesce(df.a, F.lit(True)) Le mer. 1 juil. 2015 à 12:03, Ewan Leith ewan.le...@realitymine.com a écrit : It's in spark 1.4.0, or should be at least: https://issues.apache.org/jira/browse/SPARK-6972 Ewan -Original Message- From: Hafiz Mujadid [mailto:hafizmujadi...@gmail.com] Sent: 01 July 2015 08:23 To: user@spark.apache.org Subject: coalesce on dataFrame How can we use coalesce(1, true) on dataFrame? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.streaming.receiver.maxRate Not taking effect
Hi, I have set spark.streaming.receiver.maxRate to 100. My batch interval is 4sec but still sometimes there are more than 400 records per batch. I am using spark 1.2.0. Regards,Laeeq
import errors with Eclipse Scala
Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error.For example: import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql._import org.json4s._import org.json4s.JsonDSL._import org.json4s.jackson.JsonMethodsimport org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or“object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Re: breeze.linalg.DenseMatrix not found
I think the issue was NOT with spark. I was running a spark program that dumped output to a binary file and then calling a scala program to read it and write out Matrix Market format files. The issue seems to have been with the classpath on the scala program, and went away when I added the spark jar to the classpath. Thanks for your help! Alex On Tue, Jun 30, 2015 at 9:11 AM, Burak Yavuz brk...@gmail.com wrote: How does your build file look? Are you possibly using wrong Scala versions? Have you added Breeze as a dependency to your project? If so which version? Thanks, Burak On Mon, Jun 29, 2015 at 3:45 PM, AlexG swift...@gmail.com wrote: I get the same error even when I define covOperator not to use a matrix at all: def covOperator(v : BDV[Double]) :BDV[Double] = { v } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/breeze-linalg-DenseMatrix-not-found-tp23537p23538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Custom order by in Spark SQL
Hi, Is there a way to specify a custom order by (Ordering) on a column in Spark SQL In particular I would like to have the order by applied to a currency column not to be alpha, but something like - USD, EUR, JPY, GBP etc.. I saw an earlier post on UDTs and ordering (which I can't seem to find in this archive, http://mail-archives.us.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAFGcCdWWCFCwVp7+BCaPQ=6uupmyjcbhqyjn9txeu45hjg4...@mail.gmail.com%3E), which is somewhat related to this question. Thanks Mick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Custom-order-by-in-Spark-SQL-tp23569.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.streaming.receiver.maxRate Not taking effect
This might be related: SPARK-6985 Cheers On Wed, Jul 1, 2015 at 10:27 AM, Laeeq Ahmed laeeqsp...@yahoo.com.invalid wrote: Hi, I have set spark.streaming.receiver.maxRate to 100. My batch interval is 4sec but still sometimes there are more than 400 records per batch. I am using spark 1.2.0. Regards, Laeeq
Re: How to recover in case user errors in streaming
Hi TD, Why don’t we have OnBatchError or similar method in StreamingListener ? Also, is StreamingListener only for receiver based approach or does it work for Kafka Direct API / File Based Streaming as well ? Regards, Amit From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Monday, June 29, 2015 at 5:24 PM To: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Cc: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to recover in case user errors in streaming I recommend writing using dstream.foreachRDD, and then rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of dstream.saveAsNewAPIHadoopFiles https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716 On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani aassud...@impetus.commailto:aassud...@impetus.com wrote: Also, how do you suggest catching exceptions while using with connector API like, saveAsNewAPIHadoopFiles ? From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Date: Monday, June 29, 2015 at 9:55 AM To: Tathagata Das t...@databricks.commailto:t...@databricks.com Cc: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to recover in case user errors in streaming Thanks TD, this helps. Looking forward to some fix where framework handles the batch failures by some callback methods. This will help not having to write try/catch in every transformation / action. Regards, Amit From: Tathagata Das t...@databricks.commailto:t...@databricks.com Date: Saturday, June 27, 2015 at 5:14 AM To: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Cc: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to recover in case user errors in streaming I looked at the code and found that batch exceptions are indeed ignored. This is something that is worth fixing, that batch exceptions should not be silently ignored. Also, you can catch failed batch jobs (irrespective of the number of retries) by catch the exception in foreachRDD. Here is an example. dstream.foreachRDD { rdd = try { } catch { } } This will catch failures at the granularity of the job, after all the max retries of a task has been done. But it will be hard to filter and find the push the failed record(s) somewhere. To do that, I would do use rdd.foreach or rdd.foreachPartition, inside which I would catch the exception and push that record out to another Kafka topic, and continue normal processing of other records. This would prevent the task process the partition from failing (as you are catching the bad records). dstream.foreachRDD { rdd = rdd.foreachPartition { iterator = // Create Kafka producer for bad records iterator.foreach { record = try { // process record } catch { case ExpectedException = // publish bad record to error topic in Kafka using above producer } } } } TD PS: Apologies for the Scala examples, hope you get the idea :) On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani aassud...@impetus.commailto:aassud...@impetus.com wrote: Also, I get TaskContext.get() null when used in foreach function below ( I get it when I use it in map, but the whole point here is to handle something that is breaking in action ). Please help. :( From: amit assudani aassud...@impetus.commailto:aassud...@impetus.com Date: Friday, June 26, 2015 at 11:41 AM To: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, Tathagata Das t...@databricks.commailto:t...@databricks.com Subject: Re: How to recover in case user errors in streaming Hmm, not sure why, but when I run this code, it always keeps on consuming from Kafka and proceeds ignoring the previous failed batches, Also, Now that I get the attempt number from TaskContext and I have information of max retries, I am supposed to handle it in the try/catch block, but does it mean I’ve to handle these kind of exceptions / errors in every transformation step ( map, reduce, transform, etc. ), isn’t there any callback where it says it has been retried max number of times and before being ignored you’ve a handle to do whatever you want to do with the batch / message in hand. Regards, Amit From: Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org Date: Friday, June 26, 2015 at 11:32 AM To: amit assudani aassud...@impetus.commailto:aassud...@impetus.com
Re: sparkR could not find function textFile
You can check my comment below the answer at http://stackoverflow.com/a/30959388/4577954. BTW we added a new option to sparkR.init to pass in packages and that should be a part of 1.5 Shivaram On Wed, Jul 1, 2015 at 10:03 AM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Piggybacking on this discussion. I'm trying to achieve the same, reading a csv file, from RStudio. Where I'm stuck is how to supply some additional package from RStudio to spark.init() as sparkR.init does() not provide an option to specify additional package. I tried following codefrom RStudio. It is giving me error Error in callJMethod(sqlContext, load, source, options) : Invalid jobj 1. If SparkR was restarted, Spark operations need to be re-executed. -- Sys.setenv(SPARK_HOME=C:\\spark-1.4.0-bin-hadoop2.6) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib),.libPaths())) library(SparkR) sparkR.stop() sc - sparkR.init(master=local[2], sparkEnvir = list(spark.executor.memory=1G), sparkJars=C:\\spark-1.4.0-bin-hadoop2.6\\lib\\spark-csv_2.11-1.1.0.jar) /* I have downloaded this spark-csv jar and kept it in lib folder of Spark */ sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). -- However, I also tried this from shell as 'sparkR --package com.databricks:spark-csv_2.11:1.1.0. This time I used the following code and it works all fine. sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). Any idea how to achieve the same from RStudio ? Regards, On Thu, Jun 25, 2015 at 2:38 PM, Wei Zhou zhweisop...@gmail.com wrote: I tried out the solution using spark-csv package, and it worked fine now :) Thanks. Yes, I'm playing with a file with all columns as String, but the real data I want to process are all doubles. I'm just exploring what sparkR can do versus regular scala spark, as I am by heart a R person. 2015-06-25 14:26 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com : Sure, I had a similar question that Shivaram was able fast for me, the solution is implemented using a separate DataBrick’s library. Check out this thread from the email archives [1], and the read.df() command [2]. CSV files can be a bit tricky, especially with inferring their schemas. Are you using just strings as your column types right now? Alek [1] -- http://apache-spark-developers-list.1001551.n3.nabble.com/CSV-Support-in-SparkR-td12559.html [2] -- https://spark.apache.org/docs/latest/api/R/read.df.html From: Wei Zhou zhweisop...@gmail.com Date: Thursday, June 25, 2015 at 4:15 PM To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Cc: Aleksander Eskilson alek.eskil...@cerner.com, user@spark.apache.org user@spark.apache.org Subject: Re: sparkR could not find function textFile Thanks to both Shivaram and Alek. Then if I want to create DataFrame from comma separated flat files, what would you recommend me to do? One way I can think of is first reading the data as you would do in r, using read.table(), and then create spark DataFrame out of that R dataframe, but it is obviously not scalable. 2015-06-25 13:59 GMT-07:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu: The `head` function is not supported for the RRDD that is returned by `textFile`. You can run `take(lines, 5L)`. I should add a warning here that the RDD API in SparkR is private because we might not support it in the upcoming releases. So if you can use the DataFrame API for your application you should try that out. Thanks Shivaram On Thu, Jun 25, 2015 at 1:49 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi Alek, Just a follow up question. This is what I did in sparkR shell: lines - SparkR:::textFile(sc, ./README.md) head(lines) And I am getting error: Error in x[seq_len(n)] : object of type 'S4' is not subsettable I'm wondering what did I do wrong. Thanks in advance. Wei 2015-06-25 13:44 GMT-07:00 Wei Zhou zhweisop...@gmail.com: Hi Alek, Thanks for the explanation, it is very helpful. Cheers, Wei 2015-06-25 13:40 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com: Hi there, The tutorial you’re reading there was written before the merge of SparkR for Spark 1.4.0 For the merge, the RDD API (which includes the textFile() function) was made private, as the devs felt many of its functions were too low level. They focused instead on finishing the DataFrame API which supports local, HDFS, and Hive/HBase file reads. In the meantime, the devs are trying to determine which functions of the RDD API, if any, should be made public again. You can see the rationale behind this decision on the issue’s JIRA [1]. You can still make use of those now private RDD functions by prepending the function call with the SparkR private
Re: custom RDD in java
ok..will evaluate these options but is it possible to create RDD in java? On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: If all you’re doing is just dumping tables from SQLServer to HDFS, have you looked at Sqoop? Otherwise, if you need to run this in Spark could you just use the existing JdbcRDD? From: Shushant Arora Date: Wednesday, July 1, 2015 at 10:19 AM To: user Subject: custom RDD in java Hi Is it possible to write custom RDD in java? Requirement is - I am having a list of Sqlserver tables need to be dumped in HDFS. So I have a ListString tables = {dbname.tablename,dbname.tablename2..}; then JavaRDDString rdd = javasparkcontext.parllelise(tables); JavaRDDString tablecontent = rdd.map(new FunctionString,IterableString){fetch table and return populate iterable} tablecontent.storeAsTextFile(hffs path); In rdd.map(new FunctionString,). I cannot keep complete table content in memory , so I want to creat my own RDD to handle it. Thanks Shushant
Re: Spark driver using Spark Streaming shows increasing memory/CPU usage
Hi Tathagata, Thanks for your quick reply! I’ll add some more detail below about what I’m doing - I’ve tried a lot of variations on the code to debug this, with monitoring enabled, but I didn’t want to overwhelm the issue description to start with ;-) On 30 Jun 2015, at 19:30, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Could you give more information on the operations that you are using? The code outline? And what do you mean by Spark Driver receiver events? If the driver is receiving events, how is it being sent to the executors. The events are just objects that represent actions a user takes. They contain a user id, a type and some other info, and get dumped into a MongoDB and then picked out by the Receiver. This ReceiverBSONObject runs a thread which periodically polls the db, processes new events into DBObjects and calls Receiver.store() to hand each one off to an Executor. BTW, for memory usages, I strongly recommend using jmap --histo:live to see what are the type of objects that is causing most memory usage? I’ve been running both jconsole and VisualVM to monitor the processes, and when memory usage is high it is overwhelmingly due to byte arrays. I’ve read that sometimes performing operations like sorting an RDD can lead to unreachable byte arrays (https://spark-project.atlassian.net/browse/SPARK-1001). I’ve not come across any reports that quite match our use case though. The groupByKey step seems to be a significant creator of byte arrays in my case. I’ll attach an outline of the code I’m using - I’ve tried to reduce this to the essentials; it won’t compile but should display ok in an IDE. SparkDriverOutline.java Description: SparkDriverOutline.java A note that ourMongoDBReceiver class uses StorageLevel.MEMORY_AND_DISK_SER() which will "spill partitions that don't fit in memory to disk… asserializedJava objects (one byte array per partition)”. I wondered if this might be a contributor to the problem, but our partitions are very small. Perhaps the partitions are not getting cleared up for some reason.Thanks again for taking this up. Spark Streaming has been very useful for us!NeilTDOn Tue, Jun 30, 2015 at 9:48 AM, easyonthemayo neil.m...@velocityww.com wrote:I have a Spark program which exhibits increasing resource usage. Spark Streaming (https://spark.apache.org/streaming/) is used to provide the data source. The Spark Driver class receives "events" by querying a MongoDB in a custom JavaReceiverInputDStream. These events are then transformed via mapToPair(), which creates tuples mapping an id to each event. The stream is partitioned and we run a groupByKey(). Finally the events are processed by foreachRDD(). Running it for several hours on a standalone cluster, a clear trend emerges of both CPU and heap memory usage increasing. This occurs even if the data source offers no events, so there is no actual processing to perform. Similarly, omitting the bulk of processing code within foreachRDD() does not eliminate the problem. I've tried eliminating steps in the process to identify the culprit, and it looks like it's the partitioning step that prompts the CPU usage to increase over time. Has anyone else experienced this sort of behaviour? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-using-Spark-Streaming-shows-increasing-memory-CPU-usage-tp23545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Convert CSV lines to List of Objects
Hi , How can i use Map function in java to convert all the lines of csv file into a list of objects , Can some one please help... JavaRDDListCharge rdd = sc.textFile(data.csv).map(new FunctionString, ListCharge() { @Override public ListCharge call(String s) { } }); Thanks,
Re: sparkR could not find function textFile
Hi, Piggybacking on this discussion. I'm trying to achieve the same, reading a csv file, from RStudio. Where I'm stuck is how to supply some additional package from RStudio to spark.init() as sparkR.init does() not provide an option to specify additional package. I tried following codefrom RStudio. It is giving me error Error in callJMethod(sqlContext, load, source, options) : Invalid jobj 1. If SparkR was restarted, Spark operations need to be re-executed. -- Sys.setenv(SPARK_HOME=C:\\spark-1.4.0-bin-hadoop2.6) .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib),.libPaths())) library(SparkR) sparkR.stop() sc - sparkR.init(master=local[2], sparkEnvir = list(spark.executor.memory=1G), sparkJars=C:\\spark-1.4.0-bin-hadoop2.6\\lib\\spark-csv_2.11-1.1.0.jar) /* I have downloaded this spark-csv jar and kept it in lib folder of Spark */ sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). -- However, I also tried this from shell as 'sparkR --package com.databricks:spark-csv_2.11:1.1.0. This time I used the following code and it works all fine. sqlContext - sparkRSQL.init(sc) plutoMN - read.df(sqlContext, C:\\Users\\Sourav\\Work\\SparkDataScience\\PlutoMN.csv, source = com.databricks.spark.csv). Any idea how to achieve the same from RStudio ? Regards, On Thu, Jun 25, 2015 at 2:38 PM, Wei Zhou zhweisop...@gmail.com wrote: I tried out the solution using spark-csv package, and it worked fine now :) Thanks. Yes, I'm playing with a file with all columns as String, but the real data I want to process are all doubles. I'm just exploring what sparkR can do versus regular scala spark, as I am by heart a R person. 2015-06-25 14:26 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com: Sure, I had a similar question that Shivaram was able fast for me, the solution is implemented using a separate DataBrick’s library. Check out this thread from the email archives [1], and the read.df() command [2]. CSV files can be a bit tricky, especially with inferring their schemas. Are you using just strings as your column types right now? Alek [1] -- http://apache-spark-developers-list.1001551.n3.nabble.com/CSV-Support-in-SparkR-td12559.html [2] -- https://spark.apache.org/docs/latest/api/R/read.df.html From: Wei Zhou zhweisop...@gmail.com Date: Thursday, June 25, 2015 at 4:15 PM To: shiva...@eecs.berkeley.edu shiva...@eecs.berkeley.edu Cc: Aleksander Eskilson alek.eskil...@cerner.com, user@spark.apache.org user@spark.apache.org Subject: Re: sparkR could not find function textFile Thanks to both Shivaram and Alek. Then if I want to create DataFrame from comma separated flat files, what would you recommend me to do? One way I can think of is first reading the data as you would do in r, using read.table(), and then create spark DataFrame out of that R dataframe, but it is obviously not scalable. 2015-06-25 13:59 GMT-07:00 Shivaram Venkataraman shiva...@eecs.berkeley.edu: The `head` function is not supported for the RRDD that is returned by `textFile`. You can run `take(lines, 5L)`. I should add a warning here that the RDD API in SparkR is private because we might not support it in the upcoming releases. So if you can use the DataFrame API for your application you should try that out. Thanks Shivaram On Thu, Jun 25, 2015 at 1:49 PM, Wei Zhou zhweisop...@gmail.com wrote: Hi Alek, Just a follow up question. This is what I did in sparkR shell: lines - SparkR:::textFile(sc, ./README.md) head(lines) And I am getting error: Error in x[seq_len(n)] : object of type 'S4' is not subsettable I'm wondering what did I do wrong. Thanks in advance. Wei 2015-06-25 13:44 GMT-07:00 Wei Zhou zhweisop...@gmail.com: Hi Alek, Thanks for the explanation, it is very helpful. Cheers, Wei 2015-06-25 13:40 GMT-07:00 Eskilson,Aleksander alek.eskil...@cerner.com: Hi there, The tutorial you’re reading there was written before the merge of SparkR for Spark 1.4.0 For the merge, the RDD API (which includes the textFile() function) was made private, as the devs felt many of its functions were too low level. They focused instead on finishing the DataFrame API which supports local, HDFS, and Hive/HBase file reads. In the meantime, the devs are trying to determine which functions of the RDD API, if any, should be made public again. You can see the rationale behind this decision on the issue’s JIRA [1]. You can still make use of those now private RDD functions by prepending the function call with the SparkR private namespace, for example, you’d use SparkR:::textFile(…). Hope that helps, Alek [1] -- https://issues.apache.org/jira/browse/SPARK-7230
Re: Need clarification on spark on cluster set up instruction
I have a similar use case, so I wrote a python script to fix the cluster configuration that spark-ec2 uses when you use Hadoop 2. Start a cluster with enough machines that the hdfs system can hold 1Tb (so use instance types that have SSDs), then follow the instructions at http://thousandfold.net/cz/2015/07/01/installing-spark-with-hadoop-2-using-spark-ec2/. Let me know if you have any issues. On Mon, Jun 29, 2015 at 4:32 PM, manish ranjan cse1.man...@gmail.com wrote: Hi All here goes my first question : Here is my use case I have 1TB data I want to process on ec2 using spark I have uploaded the data on ebs volume The instruction on amazon ec2 set up explains *If your application needs to access large datasets, the fastest way to do that is to load them from Amazon S3 or an Amazon EBS device into an instance of the Hadoop Distributed File System (HDFS) on your nodes* Now the new amazon instances don't have any physical volume http://aws.amazon.com/ec2/instance-types/ So do I need to do a set up for HDFS separately on ec2 (instruction also says The spark-ec2 script already sets up a HDFS instance for you) ? Any blog/write up which can help me understanding this better ? ~Manish
Re: output folder structure not getting commited and remains as _temporary
Looks like a jar conflict to me. ava.lang.NoSuchMethodException: org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData.getBytesWritten() You are having multiple versions of the same jars in the classpath. Thanks Best Regards On Wed, Jul 1, 2015 at 6:58 AM, nkd kalidas.nimmaga...@gmail.com wrote: I am running a spark application in standalone cluster on windows 7 environment. Following are the details. spark version = 1.4.0 Windows/Standalone mode built the Hadoop 2.6.0 on windows and set the env params like so HADOOP_HOME = E:\hadooptar260\hadoop-2.6.0 HADOOP_CONF_DIR =E:\hadooptar260\hadoop-2.6.0\etc\hadoop // where the core-site.xml resides added this to the path E:\hadooptar260\hadoop-2.6.0\bin Note: I am not starting Hadoop. Wanted to ensure that hadoop libraries are made available to Spark especially ensuringe hdsf.jar and haddop-common.jar are in classpath and winutils in system path @rem startMaster spark-class2.cmd org.apache.spark.deploy.master.Master --host machine1.QQQ.HYD --port 7077 @rem startWorker.This worker runs on the same machine as the master spark-class2.cmd org.apache.spark.deploy.worker.Worker spark://machine1.QQQ.HYD:7077 @rem startWorker.This worker runs on a second machine spark-class2.cmd org.apache.spark.deploy.worker.Worker spark://machine1.QQQ.HYD:7077 @rem startApp.This command is run from the machine where master and first worker are running spark-submit2 --verbose --jars /app/lib/ojdbc7.jar --driver-class-path /app/lib/ojdbc7.jar --driver-library-path /programfiles/Hadoop/hadooptar260/hadoop-2.6.0/bin --class org.ETLProcess --name MyETL --master spark://machine1.QQQ.HYD:7077 --deploy-mode client /app/appjar/myapp-0.1.0.jar ETLProcess 1 51 @rem to avoid the NoSuchmethodException, tried the following spark-submit2 --verbose --jars /app/lib/ojdbc7.jar,/app/lib/hadoop-common-2.6.0.jar,/app/lib/hadoop-hdfs-2.6.0.jar --driver-class-path /app/lib/ojdbc7.jar --driver-library-path /programfiles/Hadoop/hadooptar260/hadoop-2.6.0/bin --class org.dwh.oem.transform.ETLProcess --name SureETL --master spark://machine1.QQQ.HYD:7077 --deploy-mode client /app/appjar/myapp-0.1.0.jar ETLProcess 1 51 The above the ETL job is completing successfully by fetching the data from db and storing as json files on each of the worker nodes. *In the first node the files are proprly getting commited and I could see the removal of _temporary folder and marking it as -SUCCESS* *The issue is, files in the second node remain in the _temporary folder making them as not usable for further jobs. Help required to overcome this this issue* * This is line 176 from SparkHadoopUtil.scala where the below excetion is occurring * private def getFileSystemThreadStatistics(): Seq[AnyRef] = { val stats = FileSystem.getAllStatistics() * stats.map(Utils.invoke(classOf[Statistics], _, getThreadStatistics)) *= Line 176 } Following are the extracts from the log which also contains the below exceptions: java.lang.NoSuchMethodException: org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData.getBytesWritten() java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo java.lang.NoSuchMethodException: org.apache.hadoop.fs.FileSystem$Statistics.getThreadStatistics() --- 2015-06-30 15:55:48 DEBUG NativeCodeLoader:46 - Trying to load the custom-built native-hadoop library... 2015-06-30 15:55:48 DEBUG NativeCodeLoader:50 - Loaded the native-hadoop library 2015-06-30 15:55:48 DEBUG JniBasedUnixGroupsMapping:50 - Using JniBasedUnixGroupsMapping for Group resolution 2015-06-30 15:55:48 DEBUG JniBasedUnixGroupsMappingWithFallback:44 - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMapping 2015-06-30 15:55:48 DEBUG Groups:80 - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30; warningDeltaMs=5000 2015-06-30 15:55:48 DEBUG UserGroupInformation:193 - hadoop login 2015-06-30 15:55:48 DEBUG UserGroupInformation:142 - hadoop login commit --- 2015-06-30 15:55:50 DEBUG Master:56 - [actor] received message RegisterApplication(ApplicationDescription(SureETL)) from Actor[akka.tcp://sparkDriver@172.16.11.212:59974/user/$a#-1360185865] 2015-06-30 15:55:50 INFO Master:59 - Registering app SureETL 2015-06-30 15:55:50 INFO Master:59 - Registered app SureETL with ID app-2015063010-0001 2015-06-30 15:55:50 INFO Master:59 - Launching executor app-2015063010-0001/0 on worker worker-20150630154548-172.16.11.212-59791 2015-06-30 15:55:50 INFO Master:59 - Launching executor app-2015063010-0001/1 on worker worker-20150630155002-172.16.11.133-61908 2015-06-30 15:55:50 DEBUG Master:62 - [actor] handled message (8.672752 ms) RegisterApplication(ApplicationDescription(SureETL)) from
upload to s3, UI Total Duration and Sum of Job Durations
Hi, Our job is reading files from s3, transforming/aggregating them and writing them back to s3. While investigating performance problems I've noticed that there is big difference between sum of job durations and Total duration which appears in UI After investigating it a bit the difference caused by spark not counting time it takes to upload file parts into s3 within job duration metric. IMHO job is not finished yet(since it hasn't finished uploading parts), while in spark I can see in Succeeded/Total 256/256(i.e. everything is done) is there any possibility to see how much it takes to upload files? Are there any plans to show network time? Why job marked as finished while upload is still in progress? we are using s3a, hadoop 2.7, spark 1.3.1 thanks in advance, Igor -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/upload-to-s3-UI-Total-Duration-and-Sum-of-Job-Durations-tp23563.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with parquet write after join (Spark 1.4.0)
It says: Caused by: java.net.ConnectException: Connection refused: slave2/...:54845 Could you look in the executor logs (stderr on slave2) and see what made it shut down? Since you are doing a join there's a high possibility of OOM etc. Thanks Best Regards On Wed, Jul 1, 2015 at 10:20 AM, Pooja Jain pooja.ja...@gmail.com wrote: Hi, We are using Spark 1.4.0 on hadoop using yarn-cluster mode via spark-submit. We are facing parquet write issue after doing dataframe joins We have a full data set and then an incremental data. We are reading them as dataframes, joining them, and then writing the data to the hdfs system in parquet format. We are getting the timeout error on the last partition. But if we do a count on the joined data it is working - which gives us the confidence that join is happening properly. Only in case of writing to the hdfs it is timing out. Code flow: // join two data frames - dfBase and dfIncr on primaryKey val joinedDF = dfBase.join(dfIncr, dfBase(primaryKey) === dfIncr(primaryKey), outer) // applying a reduce function on each row. val mergedDF = joinedDF.map(x = reduceFunc(x) ) //converting back to dataframe val newdf = Spark.getSqlContext().createDataFrame(mergedDF, dfSchema) //writing to parquet file newdf.write.parquet(hdfsfilepath) Getting following exception: 15/06/30 22:47:04 WARN spark.HeartbeatReceiver: Removing executor 26 with no recent heartbeats: 255766 ms exceeds timeout 24 ms 15/06/30 22:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: Executor heartbeat timed out after 255766 ms 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:04 WARN scheduler.TaskSetManager: Lost task 6.0 in stage 7.0 (TID 216, slave2): ExecutorLostFailure (executor 26 lost) 15/06/30 22:47:04 INFO scheduler.TaskSetManager: Starting task 6.1 in stage 7.0 (TID 310, slave2, PROCESS_LOCAL, 1910 bytes) 15/06/30 22:47:04 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 3) 15/06/30 22:47:04 INFO cluster.YarnClusterSchedulerBackend: Requesting to kill executor(s) 26 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:04 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(26, slave2, 54845) 15/06/30 22:47:04 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:04 INFO yarn.YarnAllocator: Driver requested a total number of 26 executor(s). 15/06/30 22:47:04 INFO scheduler.ShuffleMapStage: ShuffleMapStage 6 is now unavailable on executor 26 (193/200, false) 15/06/30 22:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver requested to kill executor(s) 26. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:06 ERROR cluster.YarnClusterScheduler: Lost executor 26 on slave2: remote Rpc client disassociated 15/06/30 22:47:06 INFO scheduler.TaskSetManager: Re-queueing tasks for 26 from TaskSet 7.0 15/06/30 22:47:06 INFO scheduler.DAGScheduler: Executor lost: 26 (epoch 5) 15/06/30 22:47:06 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 26 from BlockManagerMaster. 15/06/30 22:47:06 INFO storage.BlockManagerMaster: Removed 26 successfully in removeExecutor 15/06/30 22:47:06 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@slave2:51849] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/06/30 22:47:06 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. slave2:51849 15/06/30 22:47:21 WARN scheduler.TaskSetManager: Lost task 6.1 in stage 7.0 (TID 310, slave2): org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:161) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:132) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.shuffle.FetchFailedException: Failed to connect to slave2/...:54845 at
Can I do Joins across Event Streams ?
Hi, I have to build a system that reacts to a set of events. Each of these events are separate streams by themselves which are consumed from different Kafka Topics and hence will have different InputDStreams. Questions: Will I be able to do joins across multiple InputDStreams and collate the output using a single Accumulator?These Event Streams can have their own frequency of occurrence. How will I be able to co-ordinate the out of sync behaviour?
coalesce on dataFrame
How can we use coalesce(1, true) on dataFrame? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-on-dataFrame-tp23564.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark run errors on Raspberry Pi
Now i'm having a strange feeling to try this on KBOX http://kevinboone.net/kbox.html :/ Thanks Best Regards On Wed, Jul 1, 2015 at 9:10 AM, Exie tfind...@prodevelop.com.au wrote: FWIW, I had some trouble getting Spark running on a Pi. My core problem was using snappy for compression as it comes as a pre-made binary for i386 and I couldnt find one for ARM. So to work around it there was an option to use LZO instead, then everything worked. Off the top of my head, it was something like: spark.sql.parquet.compression.codec=lzo This might be worth trying. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-run-errors-on-Raspberry-Pi-tp23532p23561.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Run multiple Spark jobs concurrently
Have a look at https://spark.apache.org/docs/latest/job-scheduling.html Thanks Best Regards On Wed, Jul 1, 2015 at 12:01 PM, Nirmal Fernando nir...@wso2.com wrote: Hi All, Is there any additional configs that we have to do to perform $subject? -- Thanks regards, Nirmal Associate Technical Lead - Data Technologies Team, WSO2 Inc. Mobile: +94715779733 Blog: http://nirmalfdo.blogspot.com/
Re: Can Dependencies Be Resolved on Spark Cluster?
Thanks for the enlightening solution! On Wed, Jul 1, 2015 at 12:03 AM Burak Yavuz brk...@gmail.com wrote: Hi, In your build.sbt file, all the dependencies you have (hopefully they're not too many, they only have a lot of transitive dependencies), for example: ``` libraryDependencies += org.apache.hbase % hbase % 1.1.1 libraryDependencies += junit % junit % x resolvers += Some other repo at http://some.other.repo; resolvers += Some other repo2 at http://some.other.repo2; ``` call `sbt package`, and then run spark-submit as: $ bin/spark-submit --packages org.apache.hbase:hbase:1.1.1, junit:junit:x --repositories http://some.other.repo,http://some.other.repo2 $YOUR_JAR Best, Burak On Mon, Jun 29, 2015 at 11:33 PM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Burak, Is `--package` flag only available for maven, no sbt support? On Tue, Jun 30, 2015 at 2:26 PM Burak Yavuz brk...@gmail.com wrote: You can pass `--packages your:comma-separated:maven-dependencies` to spark submit if you have Spark 1.3 or greater. Best regards, Burak On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com wrote: Hey Spark Users, I'm writing a demo with Spark and HBase. What I've done is packaging a **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to package **all dependencies** into one big jar. The rest work is copy the fat jar to Spark master node and then launch by `spark-submit`. The defect of the fat jar fashion is obvious: all dependencies is packed, yielding a huge jar file. Even worse, in my case, a vast amount of the conflicting package files in `~/.ivy/cache`fails when merging, I had to manually specify `MergingStrategy` as `rename` for all conflicting files to bypass this issue. Then I thought, there should exists an easier way to submit a thin jar with build.sbt-like file specifying dependencies, and then dependencies are automatically resolved across the cluster before the actual job is launched. I googled, except nothing related was found. Is this plausible, or is there other better ways to achieve the same goal? BEST REGARDS, Todd Leo
Re: Can I do Joins across Event Streams ?
Have a look at the window, updateStateByKey operations, if you are looking for something more sophisticated then you can actually persists these streams in an intermediate storage (say for x duration) like HBase or Cassandra or any other DB and you can do global aggregations with these. Thanks Best Regards On Wed, Jul 1, 2015 at 1:06 PM, Spark Enthusiast sparkenthusi...@yahoo.in wrote: Hi, I have to build a system that reacts to a set of events. Each of these events are separate streams by themselves which are consumed from different Kafka Topics and hence will have different InputDStreams. Questions: Will I be able to do joins across multiple InputDStreams and collate the output using a single Accumulator? These Event Streams can have their own frequency of occurrence. How will I be able to co-ordinate the out of sync behaviour?
Spark program running infinitely
for (i - 0 until distUsers.length) { val subsetData = sqlContext.sql(SELECT bidder_id, t.auction, time from BidsTable b inner join (select distinct auction from BidsTable where bidder_id='+distUsers(i)+') t on t.auction=b.auction order by t.auction, time).map(x=(x(0),x(1),x(2))) val withIndex = subsetData.zipWithIndex val indexKey = withIndex.map{case (k,v) = (v,k)} indexKey.cache val timeDiff = new ListBuffer[Long]() for (j - 1 until subsetData.count().toInt) { var current_auction = indexKey.lookup(j).map(_._2).head.toString() var past_auction = indexKey.lookup(j-1).map(_._2).head.toString() var current = indexKey.lookup(j).map(_._1).head.toString() var past = indexKey.lookup(j-1).map(_._1).head.toString() if(current.toString != past.toString current.toString == distUsers(0) current_auction==past_auction) { var current_time = indexKey.lookup(9).map(_._3).head.toString() var past_time = indexKey.lookup(0).map(_._3).head.toString() timeDiff += current_time.toLong - past_time.toLong } } bidder_timeDiff+= ((distUsers(0).toString,timeDiff.min.toString,timeDiff.min.toString)) } Above is running infinitely... Kindly help on this. Regards, Ladle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-running-infinitely-tp23565.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame Filter Inside Another Data Frame Map
Collecting it as a regular (Java/scala/Python) map. You can also broadcast the map if your going to use it multiple times. On Wednesday, July 1, 2015, Ashish Soni asoni.le...@gmail.com wrote: Thanks , So if i load some static data from database and then i need to use than in my map function to filter records what will be the best way to do it, Ashish On Wed, Jul 1, 2015 at 10:45 PM, Raghavendra Pandey raghavendra.pan...@gmail.com javascript:_e(%7B%7D,'cvml','raghavendra.pan...@gmail.com'); wrote: You cannot refer to one rdd inside another rdd.map function... Rdd object is not serialiable. Whatever objects you use inside map function should be serializable as they get transferred to executor nodes. On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com javascript:_e(%7B%7D,'cvml','asoni.le...@gmail.com'); wrote: Hi All , I am not sure what is the wrong with below code as it give below error when i access inside the map but it works outside JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() { @Override public Charge call(Charge ch) throws Exception { * DataFrame df = accountRdd.filter(login=test);* return ch; } }); 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Error with splitting contents of a dataframe column using Spark 1.4 for nested complex json file
Hello, I am having issues with splitting contents of a dataframe column using Spark 1.4. The dataframe was created by reading a nested complex json file. I used df.explode but keep getting error message. The json file format looks like [ { neid:{ }, mi:{ mts:20100609071500Z, gp:900, tMOID:Aal2Ap, mt:[ ], mv:[ { moid:ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1552q, r: [ 1, 2, 5 ] }, { moid:ManagedElement=1,TransportNetwork=1,Aal2Sp=1,Aal2Ap=r1542q, r: [ 1, 2, 5 ] } ] } }, { neid:{ neun:RC003, nedn:SubNetwork=ONRM_RootMo_R,SubNetwork=RC003,MeContext=RC003, nesw:CP90831_R9YC/11 }, mi:{ mts:20100609071500Z, gp:900, tMOID:PlugInUnit, mt:pmProcessorLoad, mv:[ { moid:ManagedElement=1,Equipment=1,Subrack=MS,Slot=6,PlugInUnit=1, r: [ 1, 2, 5 ] }, { moid:ManagedElement=1,Equipment=1,Subrack=ES-1,Slot=1,PlugInUnit=1, r: [ 1, 2, 5 ] } ] } } ] scala val df = sqlContext.read.json(/Users/xx/target/statsfile.json) scala df.show() +++ | mi|neid| +++ |[900,[pmEs,pmS...|[SubNetwork=ONRM_...| |[900,[pmIcmpInEr...|[SubNetwork=ONRM_...| |[900,pmUnsuccessf...|[SubNetwork=ONRM_...| |[900,[pmBwErrBlo...|[SubNetwork=ONRM_...| |[900,[pmSctpStat...|[SubNetwork=ONRM_...| |[900,[pmLinkInSe...|[SubNetwork=ONRM_...| |[900,[pmGrFc,p...|[SubNetwork=ONRM_...| |[900,[pmReceived...|[SubNetwork=ONRM_...| |[900,[pmIvIma,...|[SubNetwork=ONRM_...| |[900,[pmEs,pmS...|[SubNetwork=ONRM_...| |[900,[pmEs,pmS...|[SubNetwork=ONRM_...| |[900,[pmExisOrig...|[SubNetwork=ONRM_...| |[900,[pmHDelayVa...|[SubNetwork=ONRM_...| |[900,[pmReceived...|[SubNetwork=ONRM_...| |[900,[pmReceived...|[SubNetwork=ONRM_...| |[900,[pmAverageR...|[SubNetwork=ONRM_...| |[900,[pmDchFrame...|[SubNetwork=ONRM_...| |[900,[pmReceived...|[SubNetwork=ONRM_...| |[900,[pmNegative...|[SubNetwork=ONRM_...| |[900,[pmUsedTbsQ...|[SubNetwork=ONRM_...| +++ scala df.printSchema() root |-- mi: struct (nullable = true) ||-- gp: long (nullable = true) ||-- mt: string (nullable = true) ||-- mts: string (nullable = true) ||-- mv: string (nullable = true) |-- neid: struct (nullable = true) ||-- nedn: string (nullable = true) ||-- nesw: string (nullable = true) ||-- neun: string (nullable = true) scala val df1=df.select(mi.mv²) df1: org.apache.spark.sql.DataFrame = [mv: string] scala val df1=df.select(mi.mv).show() ++ | mv| ++ |[{r:[0,0,0],mo...| |{r:[0,4,0,4],m...| |{r:5,moid:Ma...| |[{r:[2147483647...| |{r:[225,1112986...| |[{r:[83250,0,0,...| |[{r:[1,2,529982...| |[{r:[26998564,0...| |[{r:[0,0,0,0,0,...| |[{r:[0,0,0],mo...| |[{r:[0,0,0],mo...| |{r:[0,0,0,0,0,0...| |{r:[0,0,1],moi...| |{r:[4587,4587],...| |[{r:[180,180],...| |[{r:[0,0,0,0,0...| |{r:[0,35101,0,0...| |[{r:[0,0,0,0,0...| |[{r:[0,1558],m...| |[{r:[7484,4870...| ++ scala df1.explode(mv,mvnew)(mv = mv.split(,)) console:28: error: value split is not a member of Nothing df1.explode(mv,mvnew)(mv = mv.split(,)) Am i doing something wrong? I need to extract data under mi.mv in separate columns so i can apply some transformations. Regards Mike
getting WARN ReliableDeliverySupervisor
Hi Expert, Hadoop version: 2.4 Spark version: 1.3.1 I am running the SparkPi example application. bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 2 The same command sometimes gets WARN ReliableDeliverySupervisor, sometimes does not. Some runs are successful even with the WARN bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 1 15/07/02 04:38:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Pi is roughly 3.141633956 bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 2 15/07/02 05:17:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 05:17:53 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@hostname:32544] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/07/02 05:18:01 ERROR YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED! Exception in thread main java.lang.NullPointerException at org.apache.spark.SparkContext.init(SparkContext.scala:544) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:28) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --executor-memory 2G lib/spark-examples-1.3.1-hadoop2.4.0.jar 1 15/07/02 05:23:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/02 05:24:09 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@hostname:15959] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. Pi is roughly 3.141625776 Also, the spark ui only available when I set --master to local. What could have caused those issues ? Thanks, Xiaohe
Spark on Hadoop 2.5.2
Hi guys, I was trying to deploy SparkSQL thrift server on Hadoop 2.5.2 with Kerberos / Hive .13. It seems I got problem as below when I tried to start thrift server. java.lang.NoSuchFieldError: SASL_PROPS at org.apache.hadoop.hive.thrift.HadoopThriftAuthBridge20S.getHadoopSaslProperties(HadoopThriftAuthBridge20S.java:126) at org.apache.hive.service.auth.HiveAuthFactory.getSaslProperties(HiveAuthFactory.java:116) at org.apache.hive.service.auth.HiveAuthFactory.getAuthTransFactory(HiveAuthFactory.java:133) at org.apache.hive.service.cli.thrift.ThriftBinaryCLIService.run(ThriftBinaryCLIService.java:43) I searched a little bit and and seems related to https://issues.apache.org/jira/browse/HIVE-7620 https://issues.apache.org/jira/browse/HIVE-7620 I was trying to make my Spark pull this change but found Spark is using something of its own hive groupId: org.spark-project.hive instead of org.apache.hive Where can I find source code this specific version of Hive? Or is there any other way around? Thanks. Shawn
Re: DataFrame Find/Filter Based on Input - Inside Map function
You can directly use filter on a Dataframe On 2 Jul 2015 12:15, Ashish Soni asoni.le...@gmail.com wrote: Hi All , I have an DataFrame Created as below options.put(dbtable, (select * from user) as account); DataFrame accountRdd = sqlContext.read().format(jdbc).options(options).load(); and i have another RDD which contains login name and i want to find the userid from above DF RDD and return it Not sure how can i do that as when i apply a map function and say filter on DF i get Null pointor exception. Please help.
Re: BroadCast Multiple DataFrame ( JDBC Tables )
I am not sure if you can broadcast data frame without collecting it on driver... On Jul 1, 2015 11:45 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi , I need to load 10 tables in memory and have them available to all the workers , Please let me me know what is the best way to do broadcast them sc.broadcast(df) allow only one Thanks,
Re: making dataframe for different types using spark-csv
hi Mohammed Guller! How can I specify schema in load method? On Thu, Jul 2, 2015 at 6:43 AM, Mohammed Guller moham...@glassbeam.com wrote: Another option is to provide the schema to the load method. One variant of the sqlContext.load takes a schema as a input parameter. You can define the schema programmatically as shown here: https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Mohammed *From:* Krishna Sankar [mailto:ksanka...@gmail.com] *Sent:* Wednesday, July 1, 2015 3:09 PM *To:* Hafiz Mujadid *Cc:* user@spark.apache.org *Subject:* Re: making dataframe for different types using spark-csv · use .cast(...).alias('...') after the DataFrame is read. · sql.functions.udf for any domain-specific conversions. Cheers k/ On Wed, Jul 1, 2015 at 11:03 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! I am using spark-csv to lead csv data into dataframe. By default it makes type of each column as string. Is there some way to get dataframe of actual types like int,double etc.? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/making-dataframe-for-different-types-using-spark-csv-tp23570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards: HAFIZ MUJADID
Re: DataFrame Filter Inside Another Data Frame Map
Any example how can i return a Hashmap from data frame ? Thanks , Ashish On Jul 1, 2015, at 11:34 PM, Holden Karau hol...@pigscanfly.ca wrote: Collecting it as a regular (Java/scala/Python) map. You can also broadcast the map if your going to use it multiple times. On Wednesday, July 1, 2015, Ashish Soni asoni.le...@gmail.com wrote: Thanks , So if i load some static data from database and then i need to use than in my map function to filter records what will be the best way to do it, Ashish On Wed, Jul 1, 2015 at 10:45 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You cannot refer to one rdd inside another rdd.map function... Rdd object is not serialiable. Whatever objects you use inside map function should be serializable as they get transferred to executor nodes. On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , I am not sure what is the wrong with below code as it give below error when i access inside the map but it works outside JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() { @Override public Charge call(Charge ch) throws Exception { DataFrame df = accountRdd.filter(login=test); return ch; } }); 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154) -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: DataFrame Find/Filter Based on Input - Inside Map function
I need to pass the value of the filter dynamically like where id=someVal and that someVal exist in another RDD. How can I do this across JavaRDD and DataFrame ? Sent from my iPad On Jul 2, 2015, at 12:49 AM, ayan guha guha.a...@gmail.com wrote: You can directly use filter on a Dataframe On 2 Jul 2015 12:15, Ashish Soni asoni.le...@gmail.com wrote: Hi All , I have an DataFrame Created as below options.put(dbtable, (select * from user) as account); DataFrame accountRdd = sqlContext.read().format(jdbc).options(options).load(); and i have another RDD which contains login name and i want to find the userid from above DF RDD and return it Not sure how can i do that as when i apply a map function and say filter on DF i get Null pointor exception. Please help.
Meets class not found error in spark console with newly hive context
All, I am using spark console 1.4.0 to do some tests, when a create a newly HiveContext (Line 18 in the code) in my test function, it always throw exception like below (It works in spark console 1.3.0), but if i removed the HiveContext (The line 18 in the code) in my function, it works fine. Any idea what's wrong with this? java.lang.ClassNotFoundException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$ iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$streamingTest$1$$anonfun$apply$1 at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.util.InnerClosureFinder$$anon$4.visitMethodInsn(Clos ureCleaner.scala:455) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.accept(Unknown Source) at org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:101) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:197) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1891) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:630) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:629) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109) at org.apache.spark.SparkContext.withScope(SparkContext.scala:681) at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:258) at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:629) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.streamingTest(console:98) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:93) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC $$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:98) 1 import org.apache.spark._ 2 import org.apache.spark.SparkContext._ 3 import org.apache.spark.streaming.{ StreamingContext, Seconds, Minutes, Time } 4 import org.apache.spark.streaming.StreamingContext._ 5 import org.apache.spark.rdd.RDD 6 import org.apache.spark.streaming.dstream.DStream 7 import org.apache.spark.HashPartitioner 8 import org.apache.spark.storage.StorageLevel 9 import org.apache.spark.sql._10 import org.apache.spark.sql.hive._11 import scala.collection.mutable.{Queue}12 import scala.concurrent.Future13 import scala.concurrent.ExecutionContext.Implicits.global14 15 def streamingTest(args: Array[String]) {16 println( create streamingContext.)17 val ssc = new StreamingContext(sc, Seconds(1))18 *val sqlContext2 = new HiveContext(sc)*19 20 val accum = sc.accumulator(0, End Accumulator)21 val queue = scala.collection.mutable.Queue(sc.textFile(G:/pipe/source))22 val textSource = ssc.queueStream(queue, true)23 textSource.foreachRDD(rdd = { rdd.foreach( item = {accum += 1} ) })24 textSource.foreachRDD(rdd = {25 var sample = rdd.take(10)26 if (sample.length 0) {27 sample.foreach(item = println(#= + item))28 }29 })30 println( Start streaming context.)31 ssc.start()32 val stopFunc = Future {var isRun = true; var duration = 0; while (isRun) { Thread.sleep(1000); duration += 1; if ( accum.value 0 || duration = 120) {println(### STOP SSC ###);ssc.stop(false, true); duration = 0; isRun = false} }}33 ssc.awaitTermination()34 println( Streaming context terminated.)35 }36 37 streamingTest(null)38 Thanks Terry
RE: making dataframe for different types using spark-csv
Another option is to provide the schema to the load method. One variant of the sqlContext.load takes a schema as a input parameter. You can define the schema programmatically as shown here: https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema Mohammed From: Krishna Sankar [mailto:ksanka...@gmail.com] Sent: Wednesday, July 1, 2015 3:09 PM To: Hafiz Mujadid Cc: user@spark.apache.org Subject: Re: making dataframe for different types using spark-csv · use .cast(...).alias('...') after the DataFrame is read. · sql.functions.udf for any domain-specific conversions. Cheers [https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]k/ On Wed, Jul 1, 2015 at 11:03 AM, Hafiz Mujadid hafizmujadi...@gmail.commailto:hafizmujadi...@gmail.com wrote: Hi experts! I am using spark-csv to lead csv data into dataframe. By default it makes type of each column as string. Is there some way to get dataframe of actual types like int,double etc.? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/making-dataframe-for-different-types-using-spark-csv-tp23570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
DataFrame Find/Filter Based on Input - Inside Map function
Hi All , I have an DataFrame Created as below options.put(dbtable, (select * from user) as account); DataFrame accountRdd = sqlContext.read().format(jdbc).options(options).load(); and i have another RDD which contains login name and i want to find the userid from above DF RDD and return it Not sure how can i do that as when i apply a map function and say filter on DF i get Null pointor exception. Please help.
Re: DataFrame Filter Inside Another Data Frame Map
You cannot refer to one rdd inside another rdd.map function... Rdd object is not serialiable. Whatever objects you use inside map function should be serializable as they get transferred to executor nodes. On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , I am not sure what is the wrong with below code as it give below error when i access inside the map but it works outside JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() { @Override public Charge call(Charge ch) throws Exception { * DataFrame df = accountRdd.filter(login=test);* return ch; } }); 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
Re: DataFrame Filter Inside Another Data Frame Map
Thanks , So if i load some static data from database and then i need to use than in my map function to filter records what will be the best way to do it, Ashish On Wed, Jul 1, 2015 at 10:45 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You cannot refer to one rdd inside another rdd.map function... Rdd object is not serialiable. Whatever objects you use inside map function should be serializable as they get transferred to executor nodes. On Jul 2, 2015 6:13 AM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , I am not sure what is the wrong with below code as it give below error when i access inside the map but it works outside JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() { @Override public Charge call(Charge ch) throws Exception { * DataFrame df = accountRdd.filter(login=test);* return ch; } }); 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
Re: making dataframe for different types using spark-csv
- use .cast(...).alias('...') after the DataFrame is read. - sql.functions.udf for any domain-specific conversions. Cheers k/ On Wed, Jul 1, 2015 at 11:03 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! I am using spark-csv to lead csv data into dataframe. By default it makes type of each column as string. Is there some way to get dataframe of actual types like int,double etc.? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/making-dataframe-for-different-types-using-spark-csv-tp23570.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Subsecond queries possible?
I removed all of the indices from the table in IQ and the time went up to 700ms for the query on the full dataset. The best time I've got so far with Spark for the full dataset is 4s with a cached table and 30 cores. However, every column in IQ is automatically indexed by default http://infocenter.sybase.com/help/topic/com.sybase.infocenter.dc00170.1510/html/iqapgv1/BABHJCIC.htm, and those indexes you can't remove. They aren't even listed in the metadata. So even though I removed all of the indexes the default indexes are still there. It's a baseline but I'm really comparing apples and oranges right now. But it's an interesting experiment nonetheless. -- Eric On Wed, Jul 1, 2015 at 12:47 PM, Debasish Das debasish.da...@gmail.com wrote: If you take bitmap indices out of sybase then I am guessing spark sql will be at par with sybase ? On that note are there plans of integrating indexed rdd ideas to spark sql to build indices ? Is there a JIRA tracking it ? On Jun 30, 2015 7:29 PM, Eric Pederson eric...@gmail.com wrote: Hi Debasish: We have the same dataset running on SybaseIQ and after the caches are warm the queries come back in about 300ms. We're looking at options to relieve overutilization and to bring down licensing costs. I realize that Spark may not be the best fit for this use case but I'm interested to see how far it can be pushed. Thanks for your help! -- Eric On Tue, Jun 30, 2015 at 5:28 PM, Debasish Das debasish.da...@gmail.com wrote: I got good runtime improvement from hive partitioninp, caching the dataset and increasing the cores through repartition...I think for your case generating mysql style indexing will help further..it is not supported in spark sql yet... I know the dataset might be too big for 1 node mysql but do you have a runtime estimate from running the same query on mysql with appropriate column indexing ? That should give us a good baseline number... For my case at least I could not put the data on 1 node mysql as it was big... If you can write the problem in a document view you can use a document store like solr/elastisearch to boost runtime...the reverse indices can get you subsecond latencies...again the schema design matters for that and you might have to let go some of sql expressiveness (like balance in a predefined bucket might be fine but looking for the exact number might be slow)
Re: Calling MLLib from SparkR
The 1.4 release does not support calling MLLib from SparkR. We are working on it as a part of https://issues.apache.org/jira/browse/SPARK-6805 On Wed, Jul 1, 2015 at 4:23 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Does Spark 1.4 support calling MLLib directly from SparkR ? If not, is there any work around, any example available somewhere ? Regards, Sourav
Calling MLLib from SparkR
Hi, Does Spark 1.4 support calling MLLib directly from SparkR ? If not, is there any work around, any example available somewhere ? Regards, Sourav
Re: Difference between spark-defaults.conf and SparkConf.set
.addJar works for me when i run it as a stand-alone application (without using spark-submit) Thanks Best Regards On Tue, Jun 30, 2015 at 7:47 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, running into a pretty strange issue: I'm setting spark.executor.extraClassPath spark.driver.extraClassPath to point to some external JARs. If I set them in spark-defaults.conf everything works perfectly. However, if I remove spark-defaults.conf and just create a SparkConf and call .set(spark.executor.extraClassPath,...) .set(spark.driver.extraClassPath,...) I get ClassNotFound exceptions from Hadoop Conf: Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.ceph.CephFileSystem not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1493) at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:1585) This seems like a bug to me -- or does spark-defaults.conf somehow get processed differently? I have dumped out sparkConf.toDebugString and in both cases (spark-defaults.conf/in code sets) it seems to have the same values in it...
Run multiple Spark jobs concurrently
Hi All, Is there any additional configs that we have to do to perform $subject? -- Thanks regards, Nirmal Associate Technical Lead - Data Technologies Team, WSO2 Inc. Mobile: +94715779733 Blog: http://nirmalfdo.blogspot.com/
question about resource allocation on the spark standalone cluster
Hello spark-users, I would like to use the spark standalone cluster for multi-tenants, to run multiple apps at the same time. The issue is, when submitting an app to the spark standalone cluster, you cannot pass --num-executors like on yarn, but only --total-executor-cores. *This may cause starvation when submitting multiple apps*. Here's an example: Say I have a cluster of 4 machines with 20GB RAM and 4 cores. In case I submit using --total-executor-cores=4 and --executor-memory=20GB, I may get these 2 extreme resource allocations: - 4 workers (on 4 machines) with 1 core each, 20GB each, blocking the entire cluster - 1 worker (on 1 machine) with 4 cores, 20GB for this machine, leaving 3 free machines to be used by other apps. Is there a way to restrict / push the standalone cluster towards the 2nd strategy (use all cores of a given worker before using a second worker)? A workaround that we did is to set SPARK_WORKER_CORES to 1, SPARK_WORKER_MEMORY to 5gb and SPARK_WORKER_INSTANCES to 4, but this is suboptimal since it runs 4 worker instances on 1 machine, which has the JVM overhead, and does not allow to share memory across partitions on the same worker. Thanks, Tomer
Re: Issues in reading a CSV file from local file system using spark-shell
Since its a windows machine, you are very likely to be hitting this one https://issues.apache.org/jira/browse/SPARK-2356 Thanks Best Regards On Wed, Jul 1, 2015 at 12:36 AM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, I'm running Spark 1.4.0 without Hadoop. I'm using the binary spark-1.4.0-bin-hadoop2.6. I start the spark-shell as : spark-shell --master local[2] --packages com.databricks:spark-csv_2.11:1.1.0 --executor-memory 2G --conf spark.local.dir=C:/Users/Sourav. Then I run : val df = sqlContext.read.format(com.databricks.spark.csv).load(file:///C:/Users/Sourav/Work/SparkDataScience/test.csv). It gives a null pointer exception - 15/06/30 12:03:44 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/06/30 12:03:44 INFO Executor: Fetching http://9.49.140.239:64868/jars/com.dat abricks_spark-csv_2.11-1.1.0.jar with timestamp 1435690997767 15/06/30 12:03:44 INFO Utils: Fetching http://9.49.140.239:64868/jars/com.databr icks_spark-csv_2.11-1.1.0.jar to C:\Users\Sourav\spark-18eb9880-4a19-46be-8c23-c 7f7e000c454\userFiles-d4df579c-4672-46ee-836c-d4dd9ea9be23\fetchFileTemp40728667 75534302313.tmp 15/06/30 12:03:44 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) at org.apache.hadoop.util.Shell.run(Shell.java:455) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java: 715) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:873) at org.apache.hadoop.fs.FileUtil.chmod(FileUtil.java:853) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:465) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor $Executor$$updateDependencies$5.apply(Executor.scala:398) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor $Executor$$updateDependencies$5.apply(Executor.scala:390) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply( TraversableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca la:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca la:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala :226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s cala:771) at org.apache.spark.executor.Executor.org $apache$spark$executor$Executor $$updateDependencies(Executor.scala:390) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 15/06/30 12:03:44 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localh ost): java.lang.NullPointerException at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.hadoop.util.Shell.runCommand(Shell.java:482) Any idea what is going wrong. Regards, Sourav
DataFrame Filter Inside Another Data Frame Map
Hi All , I am not sure what is the wrong with below code as it give below error when i access inside the map but it works outside JavaRDDCharge rdd2 = rdd.map(new FunctionCharge, Charge() { @Override public Charge call(Charge ch) throws Exception { * DataFrame df = accountRdd.filter(login=test);* return ch; } }); 5/07/01 20:38:08 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.init(DataFrame.scala:129) at org.apache.spark.sql.DataFrame.org $apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
KMeans questions
In preparing a DataFrame (spark 1.4) to use with MLlib's kmeans.train method, is there a cleaner way to create the Vectors than this? data.map{r = Vectors.dense(r.getDouble(0), r.getDouble(3), r.getDouble(4), r.getDouble(5), r.getDouble(6))} Second, once I train the model and call predict on my vectorized dataset, what's the best way to relate the cluster assignments back to the original data frame? That is, I started with df1, which has a bunch of domain information in each row and also the doubles I use to cluster. I vectorize the doubles and then train on them. I use the resulting model to predict clusters for the vectors. I'd like to look at the original domain information in light of the clusters to which they are now assigned.
Re: Calling MLLib from SparkR
Hi Shivaram, Thanks for confirmation. Wondering for doing some modeling from SparkR, is there anyway I can call a Machine Learning library of R using the bootstrapping method specified in https://amplab-extras.github.io/SparkR-pkg/. Looks like the RDD apis are now private in SparkR and no way I can achieve calling some other function in SparkR. Regards, Sourav On Wed, Jul 1, 2015 at 4:52 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: The 1.4 release does not support calling MLLib from SparkR. We are working on it as a part of https://issues.apache.org/jira/browse/SPARK-6805 On Wed, Jul 1, 2015 at 4:23 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Does Spark 1.4 support calling MLLib directly from SparkR ? If not, is there any work around, any example available somewhere ? Regards, Sourav