Re: Spark will process _temporary folder on S3 is very slow and always cause failure
I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
Re: Need Advice about reading lots of text files
Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking. I think this encourages Pat's argument that we might actually need better support for this in spark context itself? On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
HIVE SparkSQL
Hi: I need to migrate a Log Analysis System from mysql + some C++ real time computer framwork to Hadoop ecosystem. When I want to build a data warehouse. don't know which one is the right choice. Cassandra? HIVE? Or just SparkSQL ? There is few benchmark for these systems. My scenario as below: Every 5 seconds, flume will translate a log file from IDC. The log file is pre-format to adapt Mysql Load event。 There is many IDCs,and will close down OR reconnect to the flume random. Every online IDC must receive analyse of their LOG every 5mins Any Suggestion? Thanks Yours Meng
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Todd, Yes, those entries were present in the conf under the same SPARK_HOME that was used to run spark-submit. On a related note, I'm assuming that the additional spark yarn options (like spark.yarn.jar) need to be set in the same properties file that is passed to spark-submit. That apart, I assume that no other host on the cluster should require a deployment of the spark distribution or any other config change to support a spark job. Isn't that correct? On Tue, Mar 17, 2015 at 6:19 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
Re: Apache Spark Executor - number of threads
Hi devs, I would like to know this as well. It would be great if someone could provide this information. cheers On Tue, Mar 17, 2015 at 3:06 PM, Igor Petrov [via Apache Spark User List] ml-node+s1001560n22095...@n3.nabble.com wrote: Hello, is it possible to set number of threads in the Executor's pool? I see no such setting in the docs. The reason we want to try it: we want to see performance impact with different level of parallelism (having one thread per CPU, two threads per CPU, N threads per CPU). Thank You -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Executor-number-of-threads-tp22095.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=bmlyYW5kYS5wZXJlcmFAZ21haWwuY29tfDF8NjAxMDUyMzU5 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- Niranda -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Executor-number-of-threads-tp22095p22110.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What is best way to run spark job in yarn-cluster mode from java program(servlet container) and NOT using spark-submit command.
Create SparkContext set master as yarn-cluster then run it as a standalone program? Thanks Best Regards On Tue, Mar 17, 2015 at 1:27 AM, rrussell25 rrussel...@gmail.com wrote: Hi, were you ever able to determine a satisfactory approach for this problem? I have a similar situation and would prefer to execute the job directly from java code within my jms listener and/or servlet container. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-best-way-to-run-spark-job-in-yarn-cluster-mode-from-java-program-servlet-container-and-NOT-u-tp21817p22086.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Hive on Spark with Spark as a service on CDH5.2
Hive on Spark and accessing HiveContext from the shall are seperate things. Hive on Spark - https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started To access hive on Spark you need to built with -Phive. http://spark.apache.org/docs/1.2.1/building-spark.html#building-with-hive-and-jdbc-support On Tue, Mar 17, 2015 at 11:35 AM, anu anamika.guo...@gmail.com wrote: *I am not clear if spark sql supports HIve on Spark when spark is run as a service in CDH 5.2? * Can someone please clarify this. If this is possible, how what configuration changes have I to make to import hive context in spark shell as well as to be able to do a spark-submit for the job to be run on the entire cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-on-Spark-with-Spark-as-a-service-on-CDH5-2-tp22091.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
Re: Spark @ EC2: Futures timed out Ask timed out
Did you launch the cluster using spark-ec2 script? Just make sure all ports are open for master, slave instances security group. From the error, it seems its not able to connect to the driver program (port 58360) Thanks Best Regards On Tue, Mar 17, 2015 at 3:26 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, I've been trying to run a simple SparkWordCount app on EC2, but it looks like my apps are not succeeding/completing. I'm suspecting some sort of communication issue. I used the SparkWordCount app from http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/ Digging through logs I found this: 15/03/16 21:28:20 INFO Utils: Successfully started service 'driverPropsFetcher' on port 58123. Exception in thread main java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1563) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) * Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] * at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) ... 4 more Or exceptions like: *Caused by: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://sparkDriver@ip-10-111-222-111.ec2.internal:58360/), Path(/user/CoarseGrainedScheduler)]] after [3 ms] * at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) This is in EC2 and I have ports 22, 7077, 8080, and 8081 open to any source. But maybe I need to do something, too? I do see Master sees Workers and Workers do connect to the Master. I did run this in spark-shell, and it runs without problems; scala val something = sc.parallelize(1 to 1000).collect().filter(_1000 This is how I submitted the job (on the Master machine): $ spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --executor-memory 256m --master spark://ip-10-171-32-62:7077 wc-spark/target/sparkwordcount-0.0.1-SNAPSHOT.jar /usr/share/dict/words 0 Any help would be greatly appreciated. Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Re: Any IRC channel on Spark?
Dears, Is there any instructions to build spark 1.3.0 on windows 7. I tried mvn -Phive -Phive-thriftserver -DskipTests clean package but i got below errors [INFO] Spark Project Parent POM ... SUCCESS [ 7.845 s] [INFO] Spark Project Networking ... SUCCESS [ 26.209 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.701 s] [INFO] Spark Project Core . SUCCESS [04:29 min] [INFO] Spark Project Bagel SUCCESS [ 22.215 s] [INFO] Spark Project GraphX ... SUCCESS [ 59.676 s] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:40 min] [INFO] Spark Project SQL .. SUCCESS [03:05 min] [INFO] Spark Project ML Library ... FAILURE [03:49 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 16:58 min [INFO] Finished at: 2015-03-17T11:04:40+03:00 [INFO] Final Memory: 77M/1840M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle exe p 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-mllib_2.10 On Tue, Mar 17, 2015 at 10:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: There's one on Freenode, You can join #Apache-Spark There's like 60 people idling. :) Thanks Best Regards On Mon, Mar 16, 2015 at 10:46 PM, Feng Lin lfliu.x...@gmail.com wrote: Hi, everyone, I'm wondering whether there is a possibility to setup an official IRC channel on freenode. I noticed that a lot of apache projects would have a such channel to let people talk directly. Best Michael
build spark 1.3.0 on windows 7.
Sorry for old subject i am correcting it. On Tue, Mar 17, 2015 at 11:47 AM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dears, Is there any instructions to build spark 1.3.0 on windows 7. I tried mvn -Phive -Phive-thriftserver -DskipTests clean package but i got below errors [INFO] Spark Project Parent POM ... SUCCESS [ 7.845 s] [INFO] Spark Project Networking ... SUCCESS [ 26.209 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.701 s] [INFO] Spark Project Core . SUCCESS [04:29 min] [INFO] Spark Project Bagel SUCCESS [ 22.215 s] [INFO] Spark Project GraphX ... SUCCESS [ 59.676 s] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:40 min] [INFO] Spark Project SQL .. SUCCESS [03:05 min] [INFO] Spark Project ML Library ... FAILURE [03:49 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 16:58 min [INFO] Finished at: 2015-03-17T11:04:40+03:00 [INFO] Final Memory: 77M/1840M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle exe p 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-mllib_2.10
Re: Priority queue in spark
In that case, having pre configured pools, but using the correct pool at code level might do. On Tue, Mar 17, 2015 at 11:23 AM, abhi abhishek...@gmail.com wrote: yes . Each generated job can have a different priority it is like a recursive function, where in each iteration generate job will be submitted to the spark cluster based on the priority. jobs will lower priority or less than some threshold will be discarded. Thanks, Abhi On Mon, Mar 16, 2015 at 10:36 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Abhi, You mean each task of a job can have different priority or job generated via one job can have different priority? On Tue, Mar 17, 2015 at 11:04 AM, Mark Hamstra m...@clearstorydata.com wrote: http://apache-spark-developers-list.1001551.n3.nabble.com/Job-priority-td10076.html#a10079 On Mon, Mar 16, 2015 at 10:26 PM, abhi abhishek...@gmail.com wrote: If i understand correctly , the above document creates pool for priority which is static in nature and has to be defined before submitting the job . .in my scenario each generated task can have different priority. Thanks, Abhi On Mon, Mar 16, 2015 at 9:48 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: MappedStream vs Transform API
Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: why generateJob is a private API?
Hi, Thank you for the response. Regards, Madhukara Phatak http://datamantra.io/ On Tue, Mar 17, 2015 at 5:50 AM, Tathagata Das t...@databricks.com wrote: It was not really meant to be pubic and overridden. Because anything you want to do to generate jobs from RDDs can be done using DStream.foreachRDD On Sun, Mar 15, 2015 at 11:14 PM, madhu phatak phatak@gmail.com wrote: Hi, I am trying to create a simple subclass of DStream. If I understand correctly, I should override *compute *lazy operations and *generateJob* for actions. But when I try to override, generateJob it gives error saying method is private to the streaming package. Is my approach is correct or am I missing something? Regards, Madhukara Phatak http://datamantra.io/
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Could you tell me what all you did to change the version of spark? Can you fireup a spark-shell and write this line and see what happens: sc.parallelize(1 to 1).collect() Thanks Best Regards On Mon, Mar 16, 2015 at 11:13 PM, Eason Hu eas...@gmail.com wrote: Hi Akhil, Yes, I did change both versions on the project and the cluster. Any clues? Even the sample code from Spark website failed to work. Thanks, Eason On Sun, Mar 15, 2015 at 11:56 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you change both the versions? The one in your build file of your project and the spark version of your cluster? Thanks Best Regards On Sat, Mar 14, 2015 at 6:47 AM, EH eas...@gmail.com wrote: Hi all, I've been using Spark 1.1.0 for a while, and now would like to upgrade to Spark 1.1.1 or above. However, it throws the following errors: 18:05:31.522 [sparkDriver-akka.actor.default-dispatcher-3hread] ERROR TaskSchedulerImpl - Lost executor 37 on hcompute001: remote Akka client disassociated 18:05:31.530 [sparkDriver-akka.actor.default-dispatcher-3hread] WARN TaskSetManager - Lost task 0.0 in stage 1.0 (TID 0, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.567 [sparkDriver-akka.actor.default-dispatcher-2hread] ERROR TaskSchedulerImpl - Lost executor 3 on hcompute001: remote Akka client disassociated 18:05:31.568 [sparkDriver-akka.actor.default-dispatcher-2hread] WARN TaskSetManager - Lost task 1.0 in stage 1.0 (TID 1, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.988 [sparkDriver-akka.actor.default-dispatcher-23hread] ERROR TaskSchedulerImpl - Lost executor 24 on hcompute001: remote Akka client disassociated Do you know what may go wrong? I didn't change any codes, just changed the version of Spark. Thank you all, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Upgrade-from-Spark-1-1-0-to-1-1-1-Issues-tp22045.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive on Spark with Spark as a service on CDH5.2
*I am not clear if spark sql supports HIve on Spark when spark is run as a service in CDH 5.2? * Can someone please clarify this. If this is possible, how what configuration changes have I to make to import hive context in spark shell as well as to be able to do a spark-submit for the job to be run on the entire cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-on-Spark-with-Spark-as-a-service-on-CDH5-2-tp22091.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any IRC channel on Spark?
There's one on Freenode, You can join #Apache-Spark There's like 60 people idling. :) Thanks Best Regards On Mon, Mar 16, 2015 at 10:46 PM, Feng Lin lfliu.x...@gmail.com wrote: Hi, everyone, I'm wondering whether there is a possibility to setup an official IRC channel on freenode. I noticed that a lot of apache projects would have a such channel to let people talk directly. Best Michael
Re: HiveContext can't find registered function
Initially, an attribute reference (column reference), like selecting a column from a table, is not resolved since we do not know if the reference is valid or not (if this column exists in the underlying table). In the query compilation process, we will first analyze this query and resolved those attribute references. A resolved attribute reference means that this reference is valid and we know where to get the column values from the input. Hope this is helpful. On Tue, Mar 17, 2015 at 2:19 PM, Ophir Cohen oph...@gmail.com wrote: Thanks you for the answer and one more question: what does it mean 'resolved attribute'? On Mar 17, 2015 8:14 PM, Yin Huai yh...@databricks.com wrote: The number is an id we used internally to identify an resolved Attribute. Looks like basic_null_diluted_d was not resolved since there is no id associated with it. On Tue, Mar 17, 2015 at 2:08 PM, Ophir Cohen oph...@gmail.com wrote: Interesting, I thought the problem is with the method itself. I will check it soon and update. Can you elaborate what does it mean the # and the number? Is that a reference to the field in the rdd? Thank you, Ophir On Mar 17, 2015 7:06 PM, Yin Huai yh...@databricks.com wrote: Seems basic_null_diluted_d was not resolved? Can you check if basic_null_diluted_d is in you table? On Tue, Mar 17, 2015 at 9:34 AM, Ophir Cohen oph...@gmail.com wrote: Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir
graceful shutdown not so graceful?
Hi all, I am trying to do a graceful shutdown of my spark streaming job and it appears that everything shuts down gracefully but the checkpointing thread, which continues to run until it crashes. I looked at the checkpoint thread in 1.3.0 ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala) and it appears the write method in CheckpointWriter will try to schedule a new CheckpointWriteHandler (and get the below exception) irregardless of the value of 'stopped', which would be set to 'true' as it was stopped by the graceful shutdown. Is this a bug? Shouldn't the write method not try to schedule anything if stopped is true? Thanks! This is what I'm doing: = class TestStreaming extends FunSuite with BeforeAndAfterAll { @transient var sc: SparkContext = _ @transient var ssc: StreamingContext = _ override def beforeAll() = { System.clearProperty(spark.driver.port) System.clearProperty(spark.hostPort) System.setProperty(spark.cleaner.ttl, 300) val sparkConf = new SparkConf().setAppName(testSpark).setMaster(local[4]) sc = new SparkContext(sparkConf) ssc = new StreamingContext(sc, Seconds(1)) } override def afterAll() = { val stopSparkContext = true val stopGracefully = true ssc.stop(stopSparkContext, stopGracefully) sc = null ssc = null System.clearProperty(spark.driver.port) System.clearProperty(spark.hostPort) } test(testStreaming) { val rddQueue = new SynchronizedQueue[RDD[JValue]]() val inputStream = ssc.queueStream(rddQueue) rddQueue += ssc.sparkContext.makeRDD(TestInput.reports(disney)) val hydratedReports = ReportHydrator.hydrate(inputStream) ApplicationPropertyGenerator.generateFrom(hydratedReports).foreachRDD(rdd = rdd.foreach(println(_))) ssc.checkpoint(reports/streaming) ssc.start() } } This is the output I get when shutting down gracefully (the exception is half-way down): = 15/03/17 12:25:34 INFO Executor: Finished task 3.0 in stage 2.0 (TID 11). 1455 bytes result sent to driver 15/03/17 12:25:34 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 11) in 3019 ms on localhost (4/4) 15/03/17 12:25:34 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/03/17 12:25:34 INFO DAGScheduler: Stage 2 (foreachRDD at TestStreaming.scala:42) finished in 3.027 s 15/03/17 12:25:34 INFO DAGScheduler: Job 0 finished: foreachRDD at TestStreaming.scala:42, took 4.492961 s 15/03/17 12:25:34 INFO JobScheduler: Finished job streaming job 142662033 ms.0 from job set of time 142662033 ms 15/03/17 12:25:34 INFO JobScheduler: Total delay: 4.951 s for time 142662033 ms (execution: 4.532 s) 15/03/17 12:25:39 WARN JobGenerator: Timed out while stopping the job generator (timeout = 1) 15/03/17 12:25:39 INFO JobGenerator: Waited for jobs to be processed and checkpoints to be written 15/03/17 12:25:39 INFO CheckpointWriter: CheckpointWriter executor terminated ? true, waited for 0 ms. 15/03/17 12:25:39 INFO JobGenerator: Stopped JobGenerator 15/03/17 12:25:39 INFO JobGenerator: Checkpointing graph for time 142662033 ms 15/03/17 12:25:39 INFO DStreamGraph: Updating checkpoint data for time 142662033 ms 15/03/17 12:25:39 INFO DStreamGraph: Updated checkpoint data for time 142662033 ms 15/03/17 12:25:39 INFO JobScheduler: Stopped JobScheduler 15/03/17 12:25:39 INFO StreamingContext: StreamingContext stopped successfully 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 15/03/17 12:25:39 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 15/03/17 12:25:39 INFO ContextHandler: stopped
RE: Date and decimal datatype not working
Ok, thanks for the suggestions. Let me try and will confirm all. Regards Ananda From: Yin Huai [mailto:yh...@databricks.com] Sent: Tuesday, March 17, 2015 3:04 PM To: BASAK, ANANDA Cc: user@spark.apache.org Subject: Re: Date and decimal datatype not working p(0) is a String. So, you need to explicitly convert it to a Long. e.g. p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals value, you need to create BigDecimal objects from your String values. On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA ab9...@att.commailto:ab9...@att.com wrote: Hi All, I am very new in Spark world. Just started some test coding from last week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding. I am having issues while using Date and decimal data types. Following is my code that I am simply running on scala prompt. I am trying to define a table and point that to my flat file containing raw data (pipe delimited format). Once that is done, I will run some SQL queries and put the output data in to another flat file with pipe delimited format. *** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD // Define row and table case class ROW_A( TSTAMP: Long, USIDAN: String, SECNT:Int, SECT: String, BLOCK_NUM:BigDecimal, BLOCK_DEN:BigDecimal, BLOCK_PCT:BigDecimal) val TABLE_A = sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p = ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6))) TABLE_A.registerTempTable(TABLE_A) *** The second last command is giving error, like following: console:17: error: type mismatch; found : String required: Long Looks like the content from my flat file are considered as String always and not as Date or decimal. How can I make Spark to take them as Date or decimal types? Regards Ananda
shuffle write size
I have a map reduce job that reads from three logs and joins them on some key column. The underlying data is protobuf messages in sequence files. Between mappers and reducers, the underlying raw byte arrays for protobuf messages are shuffled . Roughly, for 1G input from HDFS, there is 2G data output from map phase. I am testing spark jobs (v1.3.0) on the same input. I found that shuffle write is 3 - 4 times input size. I tried passing protobuf Message object and ArrayByte but neither gives good shuffle write output. Is there any good practice on shuffling * protobuf messages * raw byte array Chen
Date and decimal datatype not working
Hi All, I am very new in Spark world. Just started some test coding from last week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding. I am having issues while using Date and decimal data types. Following is my code that I am simply running on scala prompt. I am trying to define a table and point that to my flat file containing raw data (pipe delimited format). Once that is done, I will run some SQL queries and put the output data in to another flat file with pipe delimited format. *** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD // Define row and table case class ROW_A( TSTAMP: Long, USIDAN: String, SECNT:Int, SECT: String, BLOCK_NUM:BigDecimal, BLOCK_DEN:BigDecimal, BLOCK_PCT:BigDecimal) val TABLE_A = sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p = ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6))) TABLE_A.registerTempTable(TABLE_A) *** The second last command is giving error, like following: console:17: error: type mismatch; found : String required: Long Looks like the content from my flat file are considered as String always and not as Date or decimal. How can I make Spark to take them as Date or decimal types? Regards Ananda
Re: Date and decimal datatype not working
p(0) is a String. So, you need to explicitly convert it to a Long. e.g. p(0).trim.toLong. You also need to do it for p(2). For those BigDecimals value, you need to create BigDecimal objects from your String values. On Tue, Mar 17, 2015 at 5:55 PM, BASAK, ANANDA ab9...@att.com wrote: Hi All, I am very new in Spark world. Just started some test coding from last week. I am using spark-1.2.1-bin-hadoop2.4 and scala coding. I am having issues while using Date and decimal data types. Following is my code that I am simply running on scala prompt. I am trying to define a table and point that to my flat file containing raw data (pipe delimited format). Once that is done, I will run some SQL queries and put the output data in to another flat file with pipe delimited format. *** val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.createSchemaRDD // Define row and table case class ROW_A( TSTAMP: Long, USIDAN: String, SECNT:Int, SECT: String, BLOCK_NUM:BigDecimal, BLOCK_DEN:BigDecimal, BLOCK_PCT:BigDecimal) val TABLE_A = sc.textFile(/Myhome/SPARK/files/table_a_file.txt).map(_.split(|)).map(p = ROW_A(p(0), p(1), p(2), p(3), p(4), p(5), p(6))) TABLE_A.registerTempTable(TABLE_A) *** The second last command is giving error, like following: console:17: error: type mismatch; found : String required: Long Looks like the content from my flat file are considered as String always and not as Date or decimal. How can I make Spark to take them as Date or decimal types? Regards Ananda
Apache Spark Executor - number of threads
Hello, is it possible to set number of threads in the Executor's pool? I see no such setting in the docs. The reason we want to try it: we want to see performance impact with different level of parallelism (having one thread per CPU, two threads per CPU, N threads per CPU). Thank You -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Executor-number-of-threads-tp22095.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TreeNodeException: Unresolved plan found
Hi Guys and great job! I encounter a weird problem on local mode and I'll be glad to solve it out... When trying to save ScehmaRDD into Hive table it fails with 'TreeNodeException: Unresolved plan found' I have found similar issue in Jira: https://issues.apache.org/jira/browse/SPARK-4825 but I'm using Spark 1.2.1 and I get the same error. In cluster mode it works as it should but failed in local mode. The code I'm using: *val hc = new HiveContext(new SparkContext(new SparkConf().setMaster(local[*]).setAppName(test-app)))val file = hc.parquetFile(path to my file)file.saveAsTable(my_table_name)* And I get the following error: *An exception or error caused a run to abort: Unresolved plan found, tree:'CreateTableAsSelect None, dailyprice, false, None ParquetRelation /home/ophchu/opr/repos/opr-spark/src/test/resources/aapl/derived/splits_divs/reuters/split_adj.pq/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@a02632b, [] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:'CreateTableAsSelect None, dailyprice, false, None ParquetRelation /home/ophchu/opr/repos/opr-spark/src/test/resources/aapl/derived/splits_divs/reuters/split_adj.pq/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@a02632b, []* Again, its happened only when running on local mode. Thanks! Ophir
Spark-submit and multiple files
Hello guys, I am having a hard time to understand how spark-submit behave with multiple files. I have created two code snippets. Each code snippet is composed of a main.py and work.py. The code works if I paste work.py then main.py in a pyspark shell. However both snippets do not work when using spark submit and generate different errors. Function add_1 definition outside http://www.codeshare.io/4ao8B https://justpaste.it/jzvj Embedded add_1 function definition http://www.codeshare.io/OQJxq https://justpaste.it/jzvn I am trying a way to make it work. Thank you for your support. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to find org.apache.spark.sql.catalyst.ScalaReflection class
Launching from eclipse (scala-ide) as a scala process gives such error, but as a java process (a java main class) works fine. Launching as a scala process from Intellij works fine. There is something wrong at eclipse side, not in Spark. On 03/13/2015 11:47 AM, Jianshi Huang wrote: Liancheng also found out that the Spark jars are not included in the classpath of URLClassLoader. Hmm... we're very close to the truth now. Jianshi On Fri, Mar 13, 2015 at 6:03 PM, Jianshi Huang jianshi.hu...@gmail.com mailto:jianshi.hu...@gmail.com wrote: I'm almost certain the problem is the ClassLoader. So adding fork := true solves problems for test and run. The problem is how can I fork a JVM for sbt console? fork in console := true seems not working... Jianshi On Fri, Mar 13, 2015 at 4:35 PM, Jianshi Huang jianshi.hu...@gmail.com mailto:jianshi.hu...@gmail.com wrote: I guess it's a ClassLoader issue. But I have no idea how to debug it. Any hints? Jianshi On Fri, Mar 13, 2015 at 3:00 PM, Eric Charles e...@apache.org mailto:e...@apache.org wrote: i have the same issue running spark sql code from eclipse workspace. If you run your code from the command line (with a packaged jar) or from Intellij, I bet it should work. IMHO This is some how related to eclipse env, but would love to know how to fix it (whether via eclipse conf, or via a patch in spark). On 03/01/2015 02:32 AM, Michael Armbrust wrote: I think its possible that the problem is that the scala compiler is not being loaded by the primordial classloader (but instead by some child classloader) and thus the scala reflection mirror is failing to initialize when it can't find it. Unfortunately, the only solution that I know of is to load all required jars when the JVM starts. On Sat, Feb 28, 2015 at 5:26 PM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Also, can scala version play any role here? I am using scala 2.11.5 but all spark packages have dependency to scala 2.11.2 Just wanted to make sure that scala version is not an issue here. On Sat, Feb 28, 2015 at 9:18 AM, Ashish Nigam ashnigamt...@gmail.com mailto:ashnigamt...@gmail.com wrote: Hi, I wrote a very simple program in scala to convert an existing RDD to SchemaRDD. But createSchemaRDD function is throwing exception Exception in thread main scala.ScalaReflectionException: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with primordial classloader with boot classpath [.] not found Here's more info on the versions I am using - scala.binary.version2.11/scala.binary.version spark.version1.2.1/spark.version scala.version2.11.5/scala.version Please let me know how can I resolve this problem. Thanks Ashish -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MappedStream vs Transform API
Hi, Regards, Madhukara Phatak http://datamantra.io/ On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote: That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. Ok. When I was going through source code it confused me to understand what were right extension points were. So I thought whoever go through the code may get into same situation. But if it's not super essential then ok. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. Yes please. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: ClassNotFoundException
Hi Kevin, yes I can test it means I have to build Spark from git repository? Ralph Am 17.03.15 um 02:59 schrieb Kevin (Sangwoo) Kim: Hi Ralph, It seems like https://issues.apache.org/jira/browse/SPARK-6299 issue, which is I'm working on. I submitted a PR for it, would you test it? Regards, Kevin -- Ralph Bergmann www http://www.dasralph.de | http://www.the4thFloor.eu mail ra...@dasralph.de skypedasralph facebook https://www.facebook.com/dasralph google+ https://plus.google.com/+RalphBergmann xing https://www.xing.com/profile/Ralph_Bergmann3 linkedin https://www.linkedin.com/in/ralphbergmann gulp https://www.gulp.de/Profil/RalphBergmann.html github https://github.com/the4thfloor pgp key id 0x421F9B78 pgp fingerprint CEE3 7AE9 07BE 98DF CD5A E69C F131 4A8E 421F 9B78 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hive error on partitioned tables
Hi. I'm running Spark 1.2.0. I have HiveContext and I execute the following query: select sum(field1 / 100) from table1 group by field2; field1 in hive metastore is a smallint. The schema detected by hivecontext is a int32: fileSchema: message schema { optional int32 field1; } If table1 is an unpartitioned table it works well, however, if table1 is a partitioned table it crashs in spark-submit. The error is the following: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Short at scala.runtime.BoxesRunTime.unboxToShort(BoxesRunTime.java:102) at scala.math.Numeric$ShortIsIntegral$.toInt(Numeric.scala:72) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$6.apply(Cast.scala:234) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$6.apply(Cast.scala:234) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:366) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Expression.f1(Expression.scala:162) at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:115) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:109) at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:90) at org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:50) at org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:72) at org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:526) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/17 10:42:51 ERROR Executor: Exception in task 1.0 in stage 3.0 (TID 5) java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Short at scala.runtime.BoxesRunTime.unboxToShort(BoxesRunTime.java:102) at scala.math.Numeric$ShortIsIntegral$.toInt(Numeric.scala:72) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$6.apply(Cast.scala:234) at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToInt$6.apply(Cast.scala:234) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:366) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Expression.f1(Expression.scala:162) at org.apache.spark.sql.catalyst.expressions.Divide.eval(arithmetic.scala:115) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:365) at org.apache.spark.sql.catalyst.expressions.Expression.n2(Expression.scala:109) at org.apache.spark.sql.catalyst.expressions.Add.eval(arithmetic.scala:90) at org.apache.spark.sql.catalyst.expressions.Coalesce.eval(nullFunctions.scala:50) at org.apache.spark.sql.catalyst.expressions.MutableLiteral.update(literals.scala:72) at org.apache.spark.sql.catalyst.expressions.SumFunction.update(aggregates.scala:526) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:167) at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$7.apply(Aggregate.scala:151) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at
Building Spark on Windows WAS: Any IRC channel on Spark?
Have you tried with -X switch ? Thanks On Mar 17, 2015, at 1:47 AM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dears, Is there any instructions to build spark 1.3.0 on windows 7. I tried mvn -Phive -Phive-thriftserver -DskipTests clean package but i got below errors [INFO] Spark Project Parent POM ... SUCCESS [ 7.845 s] [INFO] Spark Project Networking ... SUCCESS [ 26.209 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 9.701 s] [INFO] Spark Project Core . SUCCESS [04:29 min] [INFO] Spark Project Bagel SUCCESS [ 22.215 s] [INFO] Spark Project GraphX ... SUCCESS [ 59.676 s] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:40 min] [INFO] Spark Project SQL .. SUCCESS [03:05 min] [INFO] Spark Project ML Library ... FAILURE [03:49 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 16:58 min [INFO] Finished at: 2015-03-17T11:04:40+03:00 [INFO] Final Memory: 77M/1840M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle exe p 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn goals -rf :spark-mllib_2.10 On Tue, Mar 17, 2015 at 10:06 AM, Akhil Das ak...@sigmoidanalytics.com wrote: There's one on Freenode, You can join #Apache-Spark There's like 60 people idling. :) Thanks Best Regards On Mon, Mar 16, 2015 at 10:46 PM, Feng Lin lfliu.x...@gmail.com wrote: Hi, everyone, I'm wondering whether there is a possibility to setup an official IRC channel on freenode. I noticed that a lot of apache projects would have a such channel to let people talk directly. Best Michael
IllegalAccessError in GraphX (Spark 1.3.0 LDA)
Hi all, I'm trying to use the new LDA in mllib, but when trying to train the model, I'm getting following error: java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) Has anyone seen this yet and has an idea what might be the problem? It happens both with the provided sample data and with my own corpus. Full code + more stack below. Thx and Regards, Jeff Code: -- object LdaTest { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(LDA).setMaster(local[4]) val sc = new SparkContext(conf) //val data = scala.io.Source.fromFile(/home/jeff/nmf_compare/scikit_v.txt).getLines().toList //val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) //val corpus = parsedData.zipWithIndex.map( t = (t._2.toLong, t._1) ) //val data = sc.textFile(/home/jeff/nmf_compare/scikit_v.txt) val data = sc.textFile(/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt) val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) val corpus = parsedData.zipWithIndex.map(_.swap).cache() //val parCorpus = sc.parallelize(corpus) //println(parCorpus) val ldaModel = new LDA().setK(10).run(corpus) println(ldaModel) } } Stack: ... 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_0, waiting for it to finish... 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_4_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_1, waiting for it to finish... 15/03/17 09:48:50 INFO rdd.HadoopRDD: Input split: file:/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt:132+132 15/03/17 09:48:50 INFO storage.MemoryStore: ensureFreeSpace(1048) called with curMem=47264, maxMem=1965104824 15/03/17 09:48:50 INFO spark.CacheManager: Finished waiting for rdd_8_0 15/03/17 09:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:104) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/17 09:48:50 INFO spark.CacheManager: Whoever was loading rdd_8_0 failed; we'll try it ourselves 15/03/17 09:48:50 INFO storage.MemoryStore: Block rdd_4_1 stored as values in memory (estimated size 1048.0 B, free 1874.0 MB) 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on 10.2.200.66:51465 (size: 1048.0 B, free: 1874.1 MB) 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO
Re: MappedStream vs Transform API
That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/
Re: Building Spark on Windows WAS: Any IRC channel on Spark?
Dear Yu, With -X i got below error. [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 7.418 s] [INFO] Spark Project Networking ... SUCCESS [ 16.551 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 10.392 s] [INFO] Spark Project Core . SUCCESS [04:26 min] [INFO] Spark Project Bagel SUCCESS [ 23.876 s] [INFO] Spark Project GraphX ... SUCCESS [01:02 min] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:45 min] [INFO] Spark Project SQL .. SUCCESS [02:16 min] [INFO] Spark Project ML Library ... FAILURE [02:38 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 14:54 min [INFO] Finished at: 2015-03-17T12:54:19+03:00 [INFO] Final Memory: 76M/1702M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution: You have 1 Scalastyle violation(s). - [Hel p 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execut ion at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:216) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216) at org.apache.maven.cli.MavenCli.main(MavenCli.java:160) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.maven.plugin.MojoExecutionException: Failed during scalastyle execution at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:238) at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute(ScalastyleViolationCheckMojo.java:199) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 19 more Caused by: org.apache.maven.plugin.MojoFailureException: You have 1
Should I do spark-sql query on HDFS or apache hive?
Hi,everybody. I am new in spark. Now I want to do interactive sql query using spark sql. spark sql can run under hive or loading files from hdfs. Which is better or faster? Thanks.
Re: MappedStream vs Transform API
Hi, Sorry for the wrong formatting in the earlier mail. On Tue, Mar 17, 2015 at 2:31 PM, Tathagata Das t...@databricks.com wrote: That's not super essential, and hence hasn't been done till now. Even in core Spark there are MappedRDD, etc. even though all of them can be implemented by MapPartitionedRDD (may be the name is wrong). So its nice to maintain the consistency, MappedDStream creates MappedRDDs. :) Though this does not eliminate the possibility that we will do it. Maybe in future, if we find that maintaining these different DStreams is becoming a maintenance burden (its isn't yet), we may collapse them to use transform. We did so in the python API for exactly this reason. Ok. When I was going through source code it confused me to understand what were right extension points were. So I thought whoever go through the code may get into same situation. But if it's not super essential then ok. If you are interested in contributing to Spark Streaming, i can point you to a number of issues where your contributions will be more valuable. That will be great. TD On Tue, Mar 17, 2015 at 1:56 AM, madhu phatak phatak@gmail.com wrote: Hi, Thank you for the response. Can I give a PR to use transform for all the functions like map,flatMap etc so they are consistent with other API's?. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 11:42 PM, Tathagata Das t...@databricks.com wrote: It's mostly for legacy reasons. First we had added all the MappedDStream, etc. and then later we realized we need to expose something that is more generic for arbitrary RDD-RDD transformations. It can be easily replaced. However, there is a slight value in having MappedDStream, for developers to learn about DStreams. TD On Mon, Mar 16, 2015 at 3:37 AM, madhu phatak phatak@gmail.com wrote: Hi, Thanks for the response. I understand that part. But I am asking why the internal implementation using a subclass when it can use an existing api? Unless there is a real difference, it feels like code smell to me. Regards, Madhukara Phatak http://datamantra.io/ On Mon, Mar 16, 2015 at 2:14 PM, Shao, Saisai saisai.s...@intel.com wrote: I think these two ways are both OK for you to write streaming job, `transform` is a more general way for you to transform from one DStream to another if there’s no related DStream API (but have related RDD API). But using map maybe more straightforward and easy to understand. Thanks Jerry *From:* madhu phatak [mailto:phatak@gmail.com] *Sent:* Monday, March 16, 2015 4:32 PM *To:* user@spark.apache.org *Subject:* MappedStream vs Transform API Hi, Current implementation of map function in spark streaming looks as below. *def *map[U: ClassTag](mapFunc: T = U): DStream[U] = { *new *MappedDStream(*this*, context.sparkContext.clean(mapFunc)) } It creates an instance of MappedDStream which is a subclass of DStream. The same function can be also implemented using transform API *def map*[U: ClassTag](mapFunc: T = U): DStream[U] = this.transform(rdd = { rdd.map(mapFunc) }) Both implementation looks same. If they are same, is there any advantage having a subclass of DStream?. Why can't we just use transform API? Regards, Madhukara Phatak http://datamantra.io/ Regards, Madhukara Phatak http://datamantra.io/
Should I do spark-sql query on HDFS or hive?
Hi,everybody. I am new in spark. Now I want to do interactive sql query using spark sql. spark sql can run under hive or loading files from hdfs. Which is better or faster? Thanks.
Re: Spark SQL UDT Kryo serialization, Unable to find class
I'll caution you that this is not a stable public API. That said, it seems that the issue is that you have not copied the jar file containing your class to all of the executors. You should not need to do any special configuration of serialization (you can't for SQL, as we hard code it for performance, since we generally know all the types that are going to be shipped) On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani zia.kay...@platalytics.com wrote: Hi, I want to introduce custom type for SchemaRDD, I'm following this https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala example. But I'm having Kryo Serialization issues, here is stack trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 22, localhost): *com.esotericsoftware.kryo.KryoException: Unable to find class: com.gis.io.GeometryWritable* Serialization trace: value (org.apache.spark.sql.catalyst.expressions.MutableAny) values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at
Re: Need Advice about reading lots of text files
I agree that it would be better if Spark did a better job automatically here, though doing so is probably a non-trivial amount of work. My code is certainly worse if you have only a few very large text files for example and thus I'd generally encourage people to try the built in options first. However, one of the nice things about Spark I think is the flexibility that it gives you. So, when you are trying to read 100,000s of tiny files this works pretty well. I'll also comment that this does not create a task per file and that is another reason its faster for the many small files case. Of course that comes at the expense of locality (which doesn't matter for my use case on S3 anyway)... On Tue, Mar 17, 2015 at 8:16 AM, Imran Rashid iras...@cloudera.com wrote: Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking. I think this encourages Pat's argument that we might actually need better support for this in spark context itself? On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.comSubjectRe: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job:https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups appending the resulting RDDs or should we just pass in the entire list at once? In effect I’m asking if Spark does some optimization of whether we should do it explicitly. If the later, what rule might we use depending on our cluster setup? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: High GC time
The official guide may help: http://spark.apache.org/docs/latest/tuning.html#garbage-collection-tuning -Xiangrui On Tue, Mar 17, 2015 at 8:27 AM, jatinpreet jatinpr...@gmail.com wrote: Hi, I am getting very high GC time in my jobs. For smaller/real-time load, this becomes a real problem. Below are the details of a task I just ran. What could be the cause of such skewed GC times? 36 26010 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:44 20 s11 s 132.7 KB135.8 KB 37 26020 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:44 15 s11 s 79.4 KB 82.5 KB 38 26030 SUCCESS PROCESS_LOCAL 1 / Slave2 2015/03/17 11:18:44 2 s 0.7 s 0.0 B 37.8 KB 39 26040 SUCCESS PROCESS_LOCAL 0 / slave3 2015/03/17 11:18:45 21 s18 s 77.9 KB 79.8 KB 40 26050 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:45 14 s10 s 73.0 KB 74.9 KB 41 26060 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:45 14 s10 s 74.4 KB 76.5 KB 42 26070 SUCCESS PROCESS_LOCAL 0 / Slave3 2015/03/17 11:18:45 12 s12 s 10.9 KB 12.8 KB Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/High-GC-time-tp22104.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Should I do spark-sql query on HDFS or apache hive?
The performance has more to do with the particular format you are using, not where the metadata is coming from. Even hive tables are read from files HDFS usually. You probably should use HiveContext as its query language is more powerful than SQLContext. Also, parquet is usually the faster data format for Spark SQL. On Tue, Mar 17, 2015 at 3:41 AM, 李铖 lidali...@gmail.com wrote: Hi,everybody. I am new in spark. Now I want to do interactive sql query using spark sql. spark sql can run under hive or loading files from hdfs. Which is better or faster? Thanks.
Re: HiveContext can't find registered function
Interesting, I thought the problem is with the method itself. I will check it soon and update. Can you elaborate what does it mean the # and the number? Is that a reference to the field in the rdd? Thank you, Ophir On Mar 17, 2015 7:06 PM, Yin Huai yh...@databricks.com wrote: Seems basic_null_diluted_d was not resolved? Can you check if basic_null_diluted_d is in you table? On Tue, Mar 17, 2015 at 9:34 AM, Ophir Cohen oph...@gmail.com wrote: Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir
Re: TreeNodeException: Unresolved plan found
Ok, I managed to solve it. As the issue in jira suggests it fixed in 1.2.1, i probably had some old jars in the classpath. Cleaning everything and rebuild eventually solve the problem. On Mar 17, 2015 12:25 PM, Ophir Cohen oph...@gmail.com wrote: Hi Guys and great job! I encounter a weird problem on local mode and I'll be glad to solve it out... When trying to save ScehmaRDD into Hive table it fails with 'TreeNodeException: Unresolved plan found' I have found similar issue in Jira: https://issues.apache.org/jira/browse/SPARK-4825 but I'm using Spark 1.2.1 and I get the same error. In cluster mode it works as it should but failed in local mode. The code I'm using: *val hc = new HiveContext(new SparkContext(new SparkConf().setMaster(local[*]).setAppName(test-app)))val file = hc.parquetFile(path to my file)file.saveAsTable(my_table_name)* And I get the following error: *An exception or error caused a run to abort: Unresolved plan found, tree:'CreateTableAsSelect None, dailyprice, false, None ParquetRelation /home/ophchu/opr/repos/opr-spark/src/test/resources/aapl/derived/splits_divs/reuters/split_adj.pq/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@a02632b, [] org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree:'CreateTableAsSelect None, dailyprice, false, None ParquetRelation /home/ophchu/opr/repos/opr-spark/src/test/resources/aapl/derived/splits_divs/reuters/split_adj.pq/part-r-1.parquet, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@a02632b, []* Again, its happened only when running on local mode. Thanks! Ophir
Re: RDD to DataFrame for using ALS under org.apache.spark.ml.recommendation.ALS
Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = ALS.train(ratings, rank, numIterations, 0.01) Now, for the new ALS fit method, I am trying to use the below code to run, but getting a compilation error: val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = als.fit(ratings.toDF()) I get an error that the method toDF() is not a member of org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]]. Appreciate the help ! Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Garbage stats in Random Forest leaf node?
This is the default value (Double.MinValue) for invalid gain: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala#L67 Please ignore it. Maybe we should update `toString` to use scientific notation. -Xiangrui On Mon, Mar 16, 2015 at 5:19 PM, cjwang c...@cjwang.us wrote: I dumped the trees in the random forest model, and occasionally saw a leaf node with strange stats: - pred=1.00 prob=0.80 imp=-1.00 gain=-17976931348623157.00 Here impurity = -1 and gain = a giant negative number. Normally, I would get a None from Node.stats at a leaf node. Here it printed because Some(s) matches: node.stats match { case Some(s) = println( imp=%f gain=%f format(s.impurity, s.gain)) case None = println } Is it a bug? This doesn't seem happening in the model from DecisionTree, but my data sets are limited. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Garbage-stats-in-Random-Forest-leaf-node-tp22087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Need Advice about reading lots of text files
There are no-doubt many things that feed into the right way to read a lot of files into Spark. But why force users to learn all of those factors instead of putting an optimizer layer into the read inside Spark? BTW I realize your method is not one task per file, it’s chunked and done in parallel. Looks good for text and I may use it—but what about sequence files or json SchemaRDD/DataFrame reading? These will all have the same issue and are also likely to be in very many small files given the increasing popularity of Spark Streaming. It also seems like an optimizer would work in a very similar way for these. +1 for read optimizer :-) On Mar 17, 2015, at 10:31 AM, Michael Armbrust mich...@databricks.com wrote: I agree that it would be better if Spark did a better job automatically here, though doing so is probably a non-trivial amount of work. My code is certainly worse if you have only a few very large text files for example and thus I'd generally encourage people to try the built in options first. However, one of the nice things about Spark I think is the flexibility that it gives you. So, when you are trying to read 100,000s of tiny files this works pretty well. I'll also comment that this does not create a task per file and that is another reason its faster for the many small files case. Of course that comes at the expense of locality (which doesn't matter for my use case on S3 anyway)... On Tue, Mar 17, 2015 at 8:16 AM, Imran Rashid iras...@cloudera.com mailto:iras...@cloudera.com wrote: Interesting, on another thread, I was just arguing that the user should *not* open the files themselves and read them, b/c then they lose all the other goodies we have in HadoopRDD, eg. the metric tracking. I think this encourages Pat's argument that we might actually need better support for this in spark context itself? On Sat, Mar 14, 2015 at 1:11 PM, Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com wrote: Here is how I have dealt with many small text files (on s3 though this should generalize) in the past: http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201411.mbox/%3ccaaswr-58p66-es2haxh4i+bu__0rvxd2okewkly0mee8rue...@mail.gmail.com%3E FromMichael Armbrust mich...@databricks.com mailto:mich...@databricks.com Subject Re: S3NativeFileSystem inefficient implementation when calling sc.textFile DateThu, 27 Nov 2014 03:20:14 GMT In the past I have worked around this problem by avoiding sc.textFile(). Instead I read the data directly inside of a Spark job. Basically, you start with an RDD where each entry is a file in S3 and then flatMap that with something that reads the files and returns the lines. Here's an example: https://gist.github.com/marmbrus/fff0b058f134fa7752fe https://gist.github.com/marmbrus/fff0b058f134fa7752fe Using this class you can do something like: sc.parallelize(s3n://mybucket/file1 :: s3n://mybucket/file1 ... :: Nil).flatMap(new ReadLinesSafe(_)) You can also build up the list of files by running a Spark job: https://gist.github.com/marmbrus/15e72f7bc22337cf6653 https://gist.github.com/marmbrus/15e72f7bc22337cf6653 Michael On Sat, Mar 14, 2015 at 10:38 AM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: It’s a long story but there are many dirs with smallish part- files in them so we create a list of the individual files as input to sparkContext.textFile(fileList). I suppose we could move them and rename them to be contiguous part- files in one dir. Would that be better than passing in a long list of individual filenames? We could also make the part files much larger by collecting the smaller ones. But would any of this make a difference in IO speed? I ask because using the long file list seems to read, what amounts to a not very large data set rather slowly. If it were all in large part files in one dir I’d expect it to go much faster but this is just intuition. On Mar 14, 2015, at 9:58 AM, Koert Kuipers ko...@tresata.com mailto:ko...@tresata.com wrote: why can you not put them in a directory and read them as one input? you will get a task per file, but spark is very fast at executing many tasks (its not a jvm per task). On Sat, Mar 14, 2015 at 12:51 PM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: Any advice on dealing with a large number of separate input files? On Mar 13, 2015, at 4:06 PM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: We have many text files that we need to read in parallel. We can create a comma delimited list of files to pass in to sparkContext.textFile(fileList). The list can get very large (maybe 1) and is all on hdfs. The question is: what is the most performant way to read them? Should they be broken up and read in groups
Re: HiveContext can't find registered function
The number is an id we used internally to identify an resolved Attribute. Looks like basic_null_diluted_d was not resolved since there is no id associated with it. On Tue, Mar 17, 2015 at 2:08 PM, Ophir Cohen oph...@gmail.com wrote: Interesting, I thought the problem is with the method itself. I will check it soon and update. Can you elaborate what does it mean the # and the number? Is that a reference to the field in the rdd? Thank you, Ophir On Mar 17, 2015 7:06 PM, Yin Huai yh...@databricks.com wrote: Seems basic_null_diluted_d was not resolved? Can you check if basic_null_diluted_d is in you table? On Tue, Mar 17, 2015 at 9:34 AM, Ophir Cohen oph...@gmail.com wrote: Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir
Set spark.fileserver.uri on private cluster
Hi, I have a private cluster with private IPs, 192.168.*.*, and a gateway node with both private IP, 192.168.*.*, and public internet IP. I setup the Spark master on the gateway node and set the SPARK_MASTER_IP to the private IP. I start Spark workers on the private nodes. It works fine. The problem is with spark-shell. I start if from the gateway node with --master and --conf spark.driver.host using the private IP. The shell starts alright but when I try to run a job I get Connection refused errors from RDD. I checked the Environment for the shell and I noticed that the spark.fileserver.uri and spark.repl.class.uri are both using the public IP of the gateway. On the other hand spark.driver.host is using the private IP as expected. Setting spark.fileserver.uri or spark.repl.class.uri with --conf does not help. It seems these values are not read but calculated. Thanks! Rares
Re: Spark will process _temporary folder on S3 is very slow and always cause failure
Actually, this is the more relevant JIRA (which is resolved): https://issues.apache.org/jira/browse/SPARK-3595 6352 is about saveAsParquetFile, which is not in use here. Here is a DirectOutputCommitter implementation: https://gist.github.com/aarondav/c513916e72101bbe14ec and it can be configured in Spark with: sparkConf.set(spark.hadoop.mapred.output.committer.class, classOf[DirectOutputCommitter].getName) On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote: I'm not super familiar w/ S3, but I think the issue is that you want to use a different output committers with object stores, that don't have a simple move operation. There have been a few other threads on S3 outputcommitters. I think the most relevant for you is most probably this open JIRA: https://issues.apache.org/jira/browse/SPARK-6352 On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as a single node cluster for test. The data I use to sort is around 4GB and sit on S3, output will also on S3. I just connect spark-shell to the local cluster and run the code in the script (because I just want a benchmark now). My job is as simple as: val parquetFile = sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,) parquetFile.registerTempTable(Test) val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row = { row.mkString(\t) } } sortedResult.saveAsTextFile(s3n://myplace,); The job takes around 6 mins to finish the sort when I am monitoring the process. After I notice the process stop at: 15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at console:31, took 581.304992 s At that time, the spark actually just write all the data to the _temporary folder first, after all sub-tasks finished, it will try to move all the ready result from _temporary folder to the final location. This process might be quick locally (because it will just be a cut/paste), but it looks like very slow on my S3, it takes a few second to move one file (usually there will be 200 partitions). And then it raise exceptions after it move might be 40-50 files. org.apache.http.NoHttpResponseException: The target server failed to respond at org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101) at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252) at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281) at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247) at org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219) I try several times, but never get the full job finished. I am not sure anything wrong here, but I use something very basic and I can see the job has finished and all result on the S3 under temporary folder, but then it raise the exception and fail. Any special setting I should do here when deal with S3? I don’t know what is the issue here, I never see MapReduce has similar issue. So it could not be S3’s problem. Regards, Shuai
saveAsTable fails to save RDD in Spark SQL 1.3.0
Hi, Basically my goal is to make the Spark SQL RDDs available to Tableau software through Simba ODBC driver. I’m running standalone Spark 1.3.0 on Ubuntu 14.04. Got the source code and complied it with maven. Hive is also setup and connected to mysql all on a the same machine. The hive-site.xml file has been copied to spark/conf. Here is the content of the hive-site.xml: configuration property namejavax.jdo.option.ConnectionURL/name valuejdbc:MySql://localhost:3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valuehiveuser/value descriptionuser name for connecting to mysql server /description /property property namejavax.jdo.option.ConnectionPassword/name valuehivepassword/value descriptionpassword for connecting to mysql server /description /property /configuration Both hive and mysql work just fine. I can create a table with Hive and find it in mysql. The thriftserver is also configured and connected to the spark master. Everything works just fine and I can monitor all the workers and running applications through spark master UI. I have a very simple python script to convert a json file to an RDD like this: import json def transform(data): ts = data[:25].strip() jss = data[41:].strip() jsj = json.loads(jss) jsj['ts'] = ts return json.dumps(jsj) from pyspark.sql import HiveContext sqlContext = HiveContext(sc) rdd = sc.textFile(myfile) tbl = sqlContext.jsonRDD(rdd.map(transform)) tbl.saveAsTable(neworder) the saveAsTable fails with this: 15/03/17 17:22:17 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool Traceback (most recent call last): File stdin, line 1, in module File /opt/spark/python/pyspark/sql/dataframe.py, line 191, in saveAsTable self._jdf.saveAsTable(tableName, source, jmode, joptions) File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o31.saveAsTable. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/user/hive/warehouse/neworder/_temporary/0/task_201503171618_0008_r_01/part-r-2.parquet; isDirectory=false; length=5591; replication=1; blocksize=33554432; modification_time=142663430; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/user/hive/warehouse/neworder/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:649) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:126) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:217) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:55) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:65) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1048) at org.apache.spark.sql.DataFrame.saveAsTable(DataFrame.scala:1018) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Question on Spark 1.3 SQL External Datasource
Thanks Cheng for the clarification. Looking forward to this new API mentioned below. Yang Sent from my iPad On Mar 17, 2015, at 8:05 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Yang, My comments are in-lined below. Cheng On 3/18/15 6:53 AM, Yang Lei wrote: Hello, I am migrating my Spark SQL external datasource integration from Spark 1.2.x to Spark 1.3. I noticed, there are a couple of new filters now, e.g. org.apache.spark.sql.sources.And. However, for a sql with condition A AND B, I noticed PrunedFilteredScan.buildScan still gets an Array[Filter] with 2 filters of A and B, while I have expected to get one And filter with left == A and right == B. So my first question is: where I can find out the rules for converting a SQL condition to the filters passed to the PrunedFilteredScan.buildScan. Top level AND predicates are always broken into smaller sub-predicates. The AND filter appeared in the external data sources API is for nested predicates, like A OR (NOT (B AND C)). I do like what I see on these And, Or, Not filters where we allow recursive nested definition to connect filters together. If this is the direction we are heading to, my second question is: if we just need one Filter object instead of Array[Filter] on the buildScan. For data sources with further filter push-down ability (e.g. Parquet), breaking down top level AND predicates for them can be convenient. The third question is: what our plan is to allow a relation provider to inform Spark which filters are handled already, so that there is no redundant filtering. Yeah, this is a good point, I guess we can add some method like filterAccepted to PrunedFilteredScan. Appreciate comments and links to any existing documentation or discussion. Yang
Re: InvalidAuxServiceException in dynamicAllocation
I assume you're running YARN given the exception. I don't know if this is covered in the documentation (I took a quick look at the config document and didn't see references to it), but you need to configure Spark's external shuffle service as and auxiliary nodemanager service in your YARN cluster. That involves deploying the Spark shuffle service jar to all the NMs, and changing YARN's configuration to start the service (which should be called spark_shuffle). Please look at YARN's docs for details about how to set it up. On Tue, Mar 17, 2015 at 7:07 PM, Sea 261810...@qq.com wrote: Hi, all: Spark1.3.0 hadoop2.2.0 I put the following params in the spark-defaults.conf spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 20 spark.dynamicAllocation.maxExecutors 300 spark.dynamicAllocation.executorIdleTimeout 300 spark.shuffle.service.enabled true I started the thriftserver and do a query. Exception happened! I find it in JIRA https://issues.apache.org/jira/browse/SPARK-5759 It says fixed version 1.3.0 Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not existat sun.reflect.GeneratedConstructorAccessor28.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:203) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:113) ... 4 more -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: StorageLevel: OFF_HEAP
Ranga: Take a look at https://github.com/apache/spark/pull/4867 Cheers On Tue, Mar 17, 2015 at 6:08 PM, fightf...@163.com fightf...@163.com wrote: Hi, Ranga That's true. Typically a version mis-match issue. Note that spark 1.2.1 has tachyon built in with version 0.5.0 , I think you may need to rebuild spark with your current tachyon release. We had used tachyon for several of our spark projects in a production environment. Thanks, Sun. -- fightf...@163.com *From:* Ranga sra...@gmail.com *Date:* 2015-03-18 06:45 *To:* user@spark.apache.org *Subject:* StorageLevel: OFF_HEAP Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga
Re: Downloading data from url
The methods I mentioned are private. But I hope they give you some idea how downloading from url works. Cheers On Tue, Mar 17, 2015 at 7:01 AM, Ted Yu yuzhih...@gmail.com wrote: Please take a look at downloadFile() method in ./core/src/main/scala/org/apache/spark/util/Utils.scala You can find usage in doFetchFile(). FYI On Tue, Mar 17, 2015 at 6:52 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! Is there any api in spark to download data from url? I want to download data from url in a spark application. I want to get downloading on all nodes instead of a single node. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Downloading-data-from-url-tp22102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
InvalidAuxServiceException in dynamicAllocation
Hi, all: Spark1.3.0 hadoop2.2.0 I put the following params in the spark-defaults.conf spark.dynamicAllocation.enabled true spark.dynamicAllocation.minExecutors 20 spark.dynamicAllocation.maxExecutors 300 spark.dynamicAllocation.executorIdleTimeout 300 spark.shuffle.service.enabled true I started the thriftserver and do a query. Exception happened! I find it in JIRA https://issues.apache.org/jira/browse/SPARK-5759 It says fixed version 1.3.0 Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not existat sun.reflect.GeneratedConstructorAccessor28.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:203) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:113) ... 4 more
StorageLevel: OFF_HEAP
Hi I am trying to use the OFF_HEAP storage level in my Spark (1.2.1) cluster. The Tachyon (0.6.0-SNAPSHOT) nodes seem to be up and running. However, when I try to persist the RDD, I get the following error: ERROR [2015-03-16 22:22:54,017] ({Executor task launch worker-0} TachyonFS.java[connect]:364) - Invalid method name: 'getUserUnderfsTempFolder' ERROR [2015-03-16 22:22:54,050] ({Executor task launch worker-0} TachyonFS.java[getFileId]:1020) - Invalid method name: 'user_getFileId' Is this because of a version mis-match? On a different note, I was wondering if Tachyon has been used in a production environment by anybody in this group? Appreciate your help with this. - Ranga
Question on Spark 1.3 SQL External Datasource
Hello, I am migrating my Spark SQL external datasource integration from Spark 1.2.x to Spark 1.3. I noticed, there are a couple of new filters now, e.g. org.apache.spark.sql.sources.And. However, for a sql with condition A AND B, I noticed PrunedFilteredScan.buildScan still gets an Array[Filter] with 2 filters of A and B, while I have expected to get one And filter with left == A and right == B. So my first question is: where I can find out the rules for converting a SQL condition to the filters passed to the PrunedFilteredScan.buildScan. I do like what I see on these And, Or, Not filters where we allow recursive nested definition to connect filters together. If this is the direction we are heading to, my second question is: if we just need one Filter object instead of Array[Filter] on the buildScan. The third question is: what our plan is to allow a relation provider to inform Spark which filters are handled already, so that there is no redundant filtering. Appreciate comments and links to any existing documentation or discussion. Yang
Re: Should I do spark-sql query on HDFS or apache hive?
I am trying to explain that these are not either/or decisions. You are likely going to be storing the data on HDFS no matter what other choices you make. You can use parquet to store the data whether or not you are addressing files directly on HDFS or using the Hive Metastore to locate the underlying files by table name. Parquet is likely faster than the default format for Hive tables, but with hive you can say STORED AS PARQUET too. I suggest you look at the programming guide: http://spark.apache.org/docs/latest/sql-programming-guide.html Michael On Tue, Mar 17, 2015 at 5:10 PM, 李铖 lidali...@gmail.com wrote: Did you mean that parquet is faster than hive format ,and hive format is faster than hdfs ,for Spark SQL? : ) 2015-03-18 1:23 GMT+08:00 Michael Armbrust mich...@databricks.com: The performance has more to do with the particular format you are using, not where the metadata is coming from. Even hive tables are read from files HDFS usually. You probably should use HiveContext as its query language is more powerful than SQLContext. Also, parquet is usually the faster data format for Spark SQL. On Tue, Mar 17, 2015 at 3:41 AM, 李铖 lidali...@gmail.com wrote: Hi,everybody. I am new in spark. Now I want to do interactive sql query using spark sql. spark sql can run under hive or loading files from hdfs. Which is better or faster? Thanks.
Idempotent count
Hi all, I am new to Spark so please forgive me if my questions is stupid. I am trying to use Spark-Streaming in an application that read data from a queue (Kafka) and do some aggregation (sum, count..) and then persist result to an external storage system (MySQL, VoltDB...) From my understanding of Spark-Streaming, I can have two ways of doing aggregation: - Stateless: I don't have to keep state and just apply new delta values to the external system. From my understanding, doing in this way I may end up with over counting when there is failure and replay. - Statefull: Use checkpoint to keep state and blindly save new state to external system. Doing in this way I have correct aggregation result but I have to keep data in two places (state and external system) My questions are: - Is my understanding of Stateless and Statefull aggregation correct? If not please correct me! - For the Statefull aggregation, What does Spark-Streaming keep when it saves checkpoint? Please kindly help! Thanks -Binh
Re: 1.3 release
Yes, I did, with these arguments: --tgz -Pyarn -Phadoop-2.4 -Phive -Phive-thriftserver To be more specific about what is not working, when I launch spark-shell --master yarn, I get this error immediately after launch. I have no idea from looking at the source. java.lang.NullPointerException at org.apache.spark.sql.SQLContext.init(SQLContext.scala:141) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:49) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:408) at org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1027) at $iwC$$iwC.init(console:9) On Tue, Mar 17, 2015 at 7:43 AM, Sean Owen so...@cloudera.com wrote: OK, did you build with YARN support (-Pyarn)? and the right incantation of flags like -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.2 or similar? On Tue, Mar 17, 2015 at 2:39 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I did not find that the generic build worked. In fact I also haven't gotten a build from source to work either, though that one might be a case of PEBCAK. In the former case I got errors about the build not having YARN support. On Sun, Mar 15, 2015 at 3:03 AM, Sean Owen so...@cloudera.com wrote: I think (I hope) it's because the generic builds just work. Even though these are of course distributed mostly verbatim in CDH5, with tweaks to be compatible with other stuff at the edges, the stock builds should be fine too. Same for HDP as I understand. The CDH4 build may work on some builds of CDH4, but I think is lurking there as a Hadoop 2.0.x plus a certain YARN beta build. I'd prefer to rename it that way, myself, since it doesn't actually work with all of CDH4 anyway. Are the MapR builds there because the stock Hadoop build doesn't work on MapR? that would actually surprise me, but then, why are these two builds distributed? On Sun, Mar 15, 2015 at 6:22 AM, Eric Friedman eric.d.fried...@gmail.com wrote: Is there a reason why the prebuilt releases don't include current CDH distros and YARN support? Eric Friedman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues
Hi Akhil, sc.parallelize(1 to 1).collect() in the Spark shell on Spark v1.2.0 runs fine. However, if I do the following remotely, it will throw exception: val sc : SparkContext = new SparkContext(conf) val NUM_SAMPLES = 10 val count = sc.parallelize(1 to NUM_SAMPLES).map{i = val x = Math.random() val y = Math.random() if (x*x + y*y 1) 1 else 0 }.reduce(_ + _) println(Pi is roughly + 4.0 * count / NUM_SAMPLES) Exception: 15/03/17 17:33:52 ERROR scheduler.TaskSchedulerImpl: Lost executor 1 on hcompute32228.sjc9.service-now.com: remote Akka client disassociated 15/03/17 17:33:52 INFO scheduler.TaskSetManager: Re-queueing tasks for 1 from TaskSet 0.0 15/03/17 17:33:52 WARN scheduler.TaskSetManager: Lost task 1.1 in stage 0.0 (TID 3, hcompute32228): ExecutorLostFailure (executor lost) 15/03/17 17:33:52 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 3) 15/03/17 17:33:52 INFO storage.BlockManagerMasterActor: Trying to remove executor 1 from BlockManagerMaster. 15/03/17 17:33:52 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor 15/03/17 17:34:39 ERROR Remoting: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; local class incompatible: stream classdesc serialVersionUID = 2439208141545036836, local class serialVersionUID = -7366074099953117729 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:604) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) v1.1.0 is totally fine, but v1.1.1 and v1.2.0+ are not. Are there any special instruction to be Spark cluster for later versions? Do you know if there are anything I'm missing? Thank you for your help, Eason On Mon, Mar 16, 2015 at 11:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Could you tell me what all you did to change the version of spark? Can you fireup a spark-shell and write this line and see what happens: sc.parallelize(1 to 1).collect() Thanks Best Regards On Mon, Mar 16, 2015 at 11:13 PM, Eason Hu eas...@gmail.com wrote: Hi Akhil, Yes, I did change both versions on the project and the cluster. Any clues? Even the sample code from Spark website failed to work. Thanks, Eason On Sun, Mar 15, 2015 at 11:56 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you change both the versions? The one in your build file of your project and the spark version of your cluster? Thanks Best Regards On Sat, Mar 14, 2015 at 6:47 AM, EH eas...@gmail.com wrote: Hi all, I've been using Spark 1.1.0 for a while, and now would like to upgrade to Spark 1.1.1 or above. However, it throws the following errors: 18:05:31.522 [sparkDriver-akka.actor.default-dispatcher-3hread] ERROR TaskSchedulerImpl - Lost executor 37 on hcompute001: remote Akka client disassociated 18:05:31.530 [sparkDriver-akka.actor.default-dispatcher-3hread] WARN TaskSetManager - Lost task 0.0 in stage 1.0 (TID 0, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.567 [sparkDriver-akka.actor.default-dispatcher-2hread] ERROR TaskSchedulerImpl - Lost executor 3 on hcompute001: remote Akka client disassociated 18:05:31.568 [sparkDriver-akka.actor.default-dispatcher-2hread] WARN TaskSetManager - Lost task 1.0 in stage 1.0 (TID 1, hcompute001): ExecutorLostFailure (executor lost) 18:05:31.988 [sparkDriver-akka.actor.default-dispatcher-23hread] ERROR TaskSchedulerImpl - Lost executor 24 on hcompute001: remote Akka client disassociated Do you know what may go wrong? I didn't change any codes, just changed the version of Spark. Thank you all, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Upgrade-from-Spark-1-1-0-to-1-1-1-Issues-tp22045.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can LBFGS be used on streaming data?
Hello Jeremy, Thank you for your reply. When I am running this code on the local machine on a streaming data, it keeps giving me this error: *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) * And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Question on Spark 1.3 SQL External Datasource
Hey Yang, My comments are in-lined below. Cheng On 3/18/15 6:53 AM, Yang Lei wrote: Hello, I am migrating my Spark SQL external datasource integration from Spark 1.2.x to Spark 1.3. I noticed, there are a couple of new filters now, e.g. org.apache.spark.sql.sources.And. However, for a sql with condition A AND B, I noticed PrunedFilteredScan.buildScan still gets an Array[Filter] with 2 filters of A and B, while I have expected to get one And filter with left == A and right == B. So my first question is: where I can find out the rules for converting a SQL condition to the filters passed to the PrunedFilteredScan.buildScan. Top level AND predicates are always broken into smaller sub-predicates. The AND filter appeared in the external data sources API is for nested predicates, like A OR (NOT (B AND C)). I do like what I see on these And, Or, Not filters where we allow recursive nested definition to connect filters together. If this is the direction we are heading to, my second question is: if we just need one Filter object instead of Array[Filter] on the buildScan. For data sources with further filter push-down ability (e.g. Parquet), breaking down top level AND predicates for them can be convenient. The third question is: what our plan is to allow a relation provider to inform Spark which filters are handled already, so that there is no redundant filtering. Yeah, this is a good point, I guess we can add some method like filterAccepted to PrunedFilteredScan. Appreciate comments and links to any existing documentation or discussion. Yang
Using Spark with a SOCKS proxy
I'm trying to figure out how I might be able to use Spark with a SOCKS proxy. That is, my dream is to be able to write code in my IDE then run it without much trouble on a remote cluster, accessible only via a SOCKS proxy between the local development machine and the master node of the cluster (ignoring, for now, any dependencies that would need to be transferred--assume it's a very simple app with no dependencies that aren't part of the Spark classpath on the cluster). This is possible with Hadoop by setting hadoop.rpc.socket.factory.class.default to org.apache.hadoop.net.SocksSocketFactory and hadoop.socks.server to localhost:port on which a SOCKS proxy has been opened via ssh -D to the master node. However, I can't seem to find anything like this for Spark, and I only see very few mentions of it on the user list and on stackoverflow, with no real answers. (See links below.) I thought I might be able to use the JVM's -DsocksProxyHost and -DsocksProxyPort system properties, but it still does not seem to work. That is, if I start a SOCKS proxy to my master node using something like ssh -D 2600 master node public name then run a simple Spark app that calls SparkConf.setMaster(spark://master node private IP:7077), passing in JVM args of -DsocksProxyHost=locahost -DsocksProxyPort=2600, the driver hangs for a while before finally giving up (Application has been killed. Reason: All masters are unresponsive! Giving up.). It seems like it is not even attempting to use the SOCKS proxy. Do -DsocksProxyHost/-DsocksProxyPort not even work for Spark? http://stackoverflow.com/questions/28047000/connect-to-spark-through-a-socks-proxy (unanswered similar question from somebody else about a month ago) https://issues.apache.org/jira/browse/SPARK-5004 (unresolved, somewhat related JIRA from a few months ago) Thanks, Jonathan
ML Pipeline question about caching
Hello all: I am using the ML Pipeline, which I consider very powerful. I have the next use case: - I have three transformers, which I will call A,B,C, that basically extract features from text files, with no parameters. - I have a final stage D, which is the logistic regression estimator. - I am creating a pipeline with the sequence A,B,C,D. - Finally, I am using this pipeline as estimator parameter of the CrossValidator class. I have some concerns about how data persistance inside the cross validator works. For example, if only D has multiple parameters to tune using the cross validator, my concern is that the transformation A-B-C is being performed multiple times?. Is that the case, or it is Spark smart enough to realize that it is possible to persist the output of C? Do it will be better to leave A,B, and C outside the cross validator pipeline? Thanks a lot -- Cesar Flores
Re: ML Pipeline question about caching
Hi Cesar, I had a similar issue. Yes for now it’s better to do A,B,C outside a crossvalidator. Take a look to my comment https://issues.apache.org/jira/browse/SPARK-4766?focusedCommentId=14320038page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14320038 and this jira https://issues.apache.org/jira/browse/SPARK-5844. The problem is that transformers could also have hyperparameters in the future (like word2vec transformer). Then crossvalidator would need to find need to find the best parameters for both transformer + estimator. It will blow number of combinations (num parameters for transformer /number parameters for estimator / number of folds). Thanks, Peter Rudenko On 2015-03-18 00:26, Cesar Flores wrote: Hello all: I am using the ML Pipeline, which I consider very powerful. I have the next use case: * I have three transformers, which I will call A,B,C, that basically extract features from text files, with no parameters. * I have a final stage D, which is the logistic regression estimator. * I am creating a pipeline with the sequence A,B,C,D. * Finally, I am using this pipeline as estimator parameter of the CrossValidator class. I have some concerns about how data persistance inside the cross validator works. For example, if only D has multiple parameters to tune using the cross validator, my concern is that the transformation A-B-C is being performed multiple times?. Is that the case, or it is Spark smart enough to realize that it is possible to persist the output of C? Do it will be better to leave A,B, and C outside the cross validator pipeline? Thanks a lot -- Cesar Flores
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for
Memory Settings for local execution context
So the page that talks about settings: http://spark.apache.org/docs/1.2.1/configuration.html seems to not apply when running local contexts. I have a shell script that starts my job: xport SPARK_MASTER_OPTS=-Dsun.io.serialization.extendedDebugInfo=true export SPARK_WORKER_OPTS=-Dsun.io.serialization.extendedDebugInfo=true /Users/spark/spark/bin/spark-submit \ --class jobs.MyJob \ --master local[1] \ --conf spark.executor.memory=8g \ --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ --conf spark.driver.memory=10g \ --conf spark.executor.extraJavaOptions=-Dsun.io.serialization.extendedDebugInfo=true \ target/scala-2.10/my-job.jar And when I largely remove spark-defaults.conf and spark-env.sh, I get a running job that has only 265MB for Memory for an executor! I have no setting specified inside the jar for the SparkConf object as far as I can tell. How can I get my executor memory up to be nice and big? Thanks, Alex
Re: Should I do spark-sql query on HDFS or apache hive?
Did you mean that parquet is faster than hive format ,and hive format is faster than hdfs ,for Spark SQL? : ) 2015-03-18 1:23 GMT+08:00 Michael Armbrust mich...@databricks.com: The performance has more to do with the particular format you are using, not where the metadata is coming from. Even hive tables are read from files HDFS usually. You probably should use HiveContext as its query language is more powerful than SQLContext. Also, parquet is usually the faster data format for Spark SQL. On Tue, Mar 17, 2015 at 3:41 AM, 李铖 lidali...@gmail.com wrote: Hi,everybody. I am new in spark. Now I want to do interactive sql query using spark sql. spark sql can run under hive or loading files from hdfs. Which is better or faster? Thanks.
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
FWIW observed similar behavior in similar situation. Was able to work around by forcefully committing one of the rdds right before the union into cache, and forcing that by executing take(1). Nothing else ever helped. Seems like yet-undiscovered 1.2.x thing. On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... The job takes as input two data sets: - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. The job basically does this: A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. Configs I used to run this job: storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. What happens: The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. Some more infos and logs: Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. An executor from node ip-10-182-98-220 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out An executor from node ip-10-181-103-186 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out An executor from node ip-10-181-48-153 (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out Followed by many 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException with last one being 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything
Question on RDD groupBy and executors
Hi, I am doing a groupBy on an EdgeRDD like this, val groupedEdges = graph.edges.groupBy[VertexId](func0) while(true) { val info = groupedEdges.flatMap(func1).collect.foreach(func2) } The groupBy distributes the data to different executors on different nodes in the cluster. Given a key K (a vertexId identifying a particular group in *groupedEdges*), is there a way to find details such as - which executor is responsible for K? - which node in the cluster the executor containing K resides on? - access that specific executor (and possibly assign a task) from the driver? Thanks.
Log4j files per spark job
Hey guys, Looking for a bit of help on logging. I trying to get Spark to write log4j logs per job within a Spark cluster. So for example, I'd like: $SPARK_HOME/logs/job1.log.x $SPARK_HOME/logs/job2.log.x And I want this on the driver and on the executor. I'm trying to accomplish this by using a log4j.properties file in each job resource, but isn't logging properly. How can I get job level log on the executor and driver? Thanks in advance for taking the time to respond. D -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Log4j-files-per-spark-job-tp22106.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HiveContext can't find registered function
Thanks you for the answer and one more question: what does it mean 'resolved attribute'? On Mar 17, 2015 8:14 PM, Yin Huai yh...@databricks.com wrote: The number is an id we used internally to identify an resolved Attribute. Looks like basic_null_diluted_d was not resolved since there is no id associated with it. On Tue, Mar 17, 2015 at 2:08 PM, Ophir Cohen oph...@gmail.com wrote: Interesting, I thought the problem is with the method itself. I will check it soon and update. Can you elaborate what does it mean the # and the number? Is that a reference to the field in the rdd? Thank you, Ophir On Mar 17, 2015 7:06 PM, Yin Huai yh...@databricks.com wrote: Seems basic_null_diluted_d was not resolved? Can you check if basic_null_diluted_d is in you table? On Tue, Mar 17, 2015 at 9:34 AM, Ophir Cohen oph...@gmail.com wrote: Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir
Re: IllegalAccessError in GraphX (Spark 1.3.0 LDA)
Please check your classpath and make sure you don't have multiple Spark versions deployed. If the classpath looks correct, please create a JIRA for this issue. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 2:03 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi all, I'm trying to use the new LDA in mllib, but when trying to train the model, I'm getting following error: java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) Has anyone seen this yet and has an idea what might be the problem? It happens both with the provided sample data and with my own corpus. Full code + more stack below. Thx and Regards, Jeff Code: -- object LdaTest { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(LDA).setMaster(local[4]) val sc = new SparkContext(conf) //val data = scala.io.Source.fromFile(/home/jeff/nmf_compare/scikit_v.txt).getLines().toList //val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) //val corpus = parsedData.zipWithIndex.map( t = (t._2.toLong, t._1) ) //val data = sc.textFile(/home/jeff/nmf_compare/scikit_v.txt) val data = sc.textFile(/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt) val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) val corpus = parsedData.zipWithIndex.map(_.swap).cache() //val parCorpus = sc.parallelize(corpus) //println(parCorpus) val ldaModel = new LDA().setK(10).run(corpus) println(ldaModel) } } Stack: ... 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_0, waiting for it to finish... 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_4_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_1, waiting for it to finish... 15/03/17 09:48:50 INFO rdd.HadoopRDD: Input split: file:/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt:132+132 15/03/17 09:48:50 INFO storage.MemoryStore: ensureFreeSpace(1048) called with curMem=47264, maxMem=1965104824 15/03/17 09:48:50 INFO spark.CacheManager: Finished waiting for rdd_8_0 15/03/17 09:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:104) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/17 09:48:50 INFO spark.CacheManager: Whoever was loading rdd_8_0 failed; we'll try it ourselves 15/03/17 09:48:50 INFO storage.MemoryStore: Block rdd_4_1 stored as values in memory
Using regular rdd transforms on schemaRDD
Hi All, I was wondering how rdd transformation work on schemaRDDs. Is there a way to force the rdd transform to keep the schemaRDD types or do I need to recreate the schemaRDD by applying the applySchema method? Currently what I have is an array of SchemaRDDs and I just want to do a union across them i.e. I want the result to be one SchemaRDD with the union of all the SchemaRDDs in the array. This is what I currently have that is not working: scala z res23: Array[org.apache.spark.sql.SchemaRDD] scala z.reduce((a,b) = a.union(b)) I get the following error: found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] required: org.apache.spark.sql.SchemaRDD z.reduce((a,b) = a.union(b)) I also noticed then when I do a simple join: z(0).join(z(1)) the result back is not a schemaRDD, but a normal RDD: scala z(0).union(z(1)) res22: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] Is there a simple way for me to convert back to schemaRDD or do I need to HiveContext.applySchema(res22, myschema)? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-regular-rdd-transforms-on-schemaRDD-tp22105.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.0.2 failover doesnt port running application context to new master
We have spark 1.0.2 cluster with 3 nodes under HA setup using zookeeper. We have long running self contained spark service that serves on-demand requests. I tried to do failover test by killing spark master and see if our application get ported over to new master. Looks like killing master doesn't really kills executors that were created by application. So our application is still able to serve request but problem is I can no longer see our applicaiton running in UI. Probably just issue of not having a history server ? Driver UI still works. Can someone confirm this? I am also attaching screenshot of new master console that display Running Application section Nirav -- [image: What's New with Xactly] http://www.xactlycorp.com/email-click/ [image: Facebook] http://www.facebook.com/XactlyCorp [image: LinkedIn] http://www.linkedin.com/company/xactly-corporation [image: Twitter] https://twitter.com/xactly [image: YouTube] http://www.youtube.com/xactlycorporation - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDD to DataFrame for using ALS under org.apache.spark.ml.recommendation.ALS
Please remember to copy the user list next time. I might not be able to respond quickly. There are many others who can help or who can benefit from the discussion. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 12:04 PM, Jay Katukuri jkatuk...@apple.com wrote: Great Xiangrui. It works now. Sorry that I needed to bug you :) Jay On Mar 17, 2015, at 11:48 AM, Xiangrui Meng men...@gmail.com wrote: Please check this section in the user guide: http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection You need `import sqlContext.implicits._` to use `toDF()`. -Xiangrui On Mon, Mar 16, 2015 at 2:34 PM, Jay Katukuri jkatuk...@apple.com wrote: Hi Xiangrui, Thanks a lot for the quick reply. I am still facing an issue. I have tried the code snippet that you have suggested: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate”)} for this, I got the below error: error: ';' expected but '.' found. [INFO] }.toDF(user, item, rate”)} [INFO] ^ when I tried below code val ratings = purchase.map ( line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }).toDF(user, item, rate) error: value toDF is not a member of org.apache.spark.rdd.RDD[(Int, Int, Float)] [INFO] possible cause: maybe a semicolon is missing before `value toDF'? [INFO] }).toDF(user, item, rate) I have looked at the document that you have shared and tried the following code: case class Record(user: Int, item: Int, rate:Double) val ratings = purchase.map(_.split(',')).map(r =Record(r(0).toInt, r(1).toInt, r(2).toDouble)) .toDF(user, item, rate) for this, I got the below error: error: value toDF is not a member of org.apache.spark.rdd.RDD[Record] Appreciate your help ! Thanks, Jay On Mar 16, 2015, at 11:35 AM, Xiangrui Meng men...@gmail.com wrote: Try this: val ratings = purchase.map { line = line.split(',') match { case Array(user, item, rate) = (user.toInt, item.toInt, rate.toFloat) }.toDF(user, item, rate) Doc for DataFrames: http://spark.apache.org/docs/latest/sql-programming-guide.html -Xiangrui On Mon, Mar 16, 2015 at 9:08 AM, jaykatukuri jkatuk...@apple.com wrote: Hi all, I am trying to use the new ALS implementation under org.apache.spark.ml.recommendation.ALS. The new method to invoke for training seems to be override def fit(dataset: DataFrame, paramMap: ParamMap): ALSModel. How do I create a dataframe object from ratings data set that is on hdfs ? where as the method in the old ALS implementation under org.apache.spark.mllib.recommendation.ALS was def train( ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, blocks: Int, seed: Long ): MatrixFactorizationModel My code to run the old ALS train method is as below: val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = ALS.train(ratings, rank, numIterations, 0.01) Now, for the new ALS fit method, I am trying to use the below code to run, but getting a compilation error: val als = new ALS() .setRank(rank) .setRegParam(regParam) .setImplicitPrefs(implicitPrefs) .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) val sc = new SparkContext(conf) val pfile = args(0) val purchase=sc.textFile(pfile) val ratings = purchase.map(_.split(',') match { case Array(user, item, rate) = Rating(user.toInt, item.toInt, rate.toInt) }) val model = als.fit(ratings.toDF()) I get an error that the method toDF() is not a member of org.apache.spark.rdd.RDD[org.apache.spark.ml.recommendation.ALS.Rating[Int]]. Appreciate the help ! Thanks, Jay -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-to-DataFrame-for-using-ALS-under-org-apache-spark-ml-recommendation-ALS-tp22083.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Garbage stats in Random Forest leaf node?
There are two cases: minInstancesPerNode not satisfied or minInfoGain not satisfied: https://github.com/apache/spark/blob/9b746f380869b54d673e3758ca5e4475f76c864a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala#L729 https://github.com/apache/spark/blob/9b746f380869b54d673e3758ca5e4475f76c864a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala#L745 On Tue, Mar 17, 2015 at 12:59 PM, Chang-Jia Wang c...@cjwang.us wrote: Just curious, why most of the leaf nodes returns None, but just a couple returns default? Why would the gain invalid? C.J. On Mar 17, 2015, at 11:53 AM, Xiangrui Meng men...@gmail.com wrote: This is the default value (Double.MinValue) for invalid gain: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/tree/model/InformationGainStats.scala#L67 Please ignore it. Maybe we should update `toString` to use scientific notation. -Xiangrui On Mon, Mar 16, 2015 at 5:19 PM, cjwang c...@cjwang.us wrote: I dumped the trees in the random forest model, and occasionally saw a leaf node with strange stats: - pred=1.00 prob=0.80 imp=-1.00 gain=-17976931348623157.00 Here impurity = -1 and gain = a giant negative number. Normally, I would get a None from Node.stats at a leaf node. Here it printed because Some(s) matches: node.stats match { case Some(s) = println( imp=%f gain=%f format(s.impurity, s.gain)) case None = println } Is it a bug? This doesn't seem happening in the model from DecisionTree, but my data sets are limited. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Garbage-stats-in-Random-Forest-leaf-node-tp22087.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IllegalAccessError in GraphX (Spark 1.3.0 LDA)
Hi Xiangrui, thank you a lot for the hint! I just tried on another machine with a clean project and there it worked like a charm. Will retry on the other machine tomorrow. Regards, Jeff 2015-03-17 19:57 GMT+01:00 Xiangrui Meng men...@gmail.com: Please check your classpath and make sure you don't have multiple Spark versions deployed. If the classpath looks correct, please create a JIRA for this issue. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 2:03 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi all, I'm trying to use the new LDA in mllib, but when trying to train the model, I'm getting following error: java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) Has anyone seen this yet and has an idea what might be the problem? It happens both with the provided sample data and with my own corpus. Full code + more stack below. Thx and Regards, Jeff Code: -- object LdaTest { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(LDA).setMaster(local[4]) val sc = new SparkContext(conf) //val data = scala.io.Source.fromFile(/home/jeff/nmf_compare/scikit_v.txt).getLines().toList //val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) //val corpus = parsedData.zipWithIndex.map( t = (t._2.toLong, t._1) ) //val data = sc.textFile(/home/jeff/nmf_compare/scikit_v.txt) val data = sc.textFile(/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt) val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) val corpus = parsedData.zipWithIndex.map(_.swap).cache() //val parCorpus = sc.parallelize(corpus) //println(parCorpus) val ldaModel = new LDA().setK(10).run(corpus) println(ldaModel) } } Stack: ... 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_0, waiting for it to finish... 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_4_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_1, waiting for it to finish... 15/03/17 09:48:50 INFO rdd.HadoopRDD: Input split: file:/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt:132+132 15/03/17 09:48:50 INFO storage.MemoryStore: ensureFreeSpace(1048) called with curMem=47264, maxMem=1965104824 15/03/17 09:48:50 INFO spark.CacheManager: Finished waiting for rdd_8_0 15/03/17 09:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:104) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at
Re: Using regular rdd transforms on schemaRDD
Looks like if I use unionAll this works. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-regular-rdd-transforms-on-schemaRDD-tp22105p22107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL UDT Kryo serialization, Unable to find class
Hi, I want to introduce custom type for SchemaRDD, I'm following this https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala example. But I'm having Kryo Serialization issues, here is stack trace: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 22, localhost): *com.esotericsoftware.kryo.KryoException: Unable to find class: com.gis.io.GeometryWritable* Serialization trace: value (org.apache.spark.sql.catalyst.expressions.MutableAny) values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46) at org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Bharath, Do you have these entries in your $SPARK_HOME/conf/spark-defaults.conf file? spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 On Tue, Mar 17, 2015 at 1:04 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Still no luck running purpose-built 1.3 against HDP 2.2 after following all the instructions. Anyone else faced this issue? On Mon, Mar 16, 2015 at 8:53 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Todd, Thanks for the help. I'll try again after building a distribution with the 1.3 sources. However, I wanted to confirm what I mentioned earlier: is it sufficient to copy the distribution only to the client host from where spark-submit is invoked(with spark.yarn.jar set), or is there a need to ensure that the entire distribution is deployed made available pre-deployed on every host in the yarn cluster? I'd assume that the latter shouldn't be necessary. On Mon, Mar 16, 2015 at 8:38 PM, Todd Nist tsind...@gmail.com wrote: Hi Bharath, I ran into the same issue a few days ago, here is a link to a post on Horton's fourm. http://hortonworks.com/community/forums/search/spark+1.2.1/ Incase anyone else needs to perform this these are the steps I took to get it to work with Spark 1.2.1 as well as Spark 1.3.0-RC3: 1. Pull 1.2.1 Source 2. Apply the following patches a. Address jackson version, https://github.com/apache/spark/pull/3938 b. Address the propagation of the hdp.version set in the spark-default.conf, https://github.com/apache/spark/pull/3409 3. build with $SPARK_HOME./make-distribution.sh –name hadoop2.6 –tgz -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Then deploy the resulting artifact = spark-1.2.1-bin-hadoop2.6.tgz following instructions in the HDP Spark preview http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ FWIW spark-1.3.0 appears to be working fine with HDP as well and steps 2a and 2b are not required. HTH -Todd On Mon, Mar 16, 2015 at 10:13 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, Trying to run spark ( 1.2.1 built for hdp 2.2) against a yarn cluster results in the AM failing to start with following error on stderr: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher An application id was assigned to the job, but there were no logs. Note that the spark distribution has not been installed on every host in the cluster and the aforementioned spark build was copied to one of the hadoop client hosts in the cluster to launch the job. Spark-submit was run with --master yarn-client and spark.yarn.jar was set to the assembly jar from the above distribution. Switching the spark distribution to the HDP recommended version and following the instructions on this page http://hortonworks.com/hadoop-tutorial/using-apache-spark-hdp/ did not fix the problem either. Any idea what may have caused this error ? Thanks, Bharath
GraphX - Correct path traversal order from an Array[Edge[ED]]
Below is a listing from a filtered Array[Edge[ED]]: scala altGraph.edges.filter {case (edge) = edge.attr.contains(wien-krak-s103) }.collect.foreach { case (edge) = println(sSrcId = ${edge.srcId}, DstId = ${edge.dstId})} SrcId = 1, DstId = 2 SrcId = 2, DstId = 3 ... SrcId = 8, DstId = 9 SrcId = 9, DstId = 10 SrcId = 11, DstId = 1 SrcId = 10, DstId = 11 SrcId = 11, DstId = 12 ... SrcId = 14, DstId = 15 SrcId = 15, DstId = 16 SrcId = 16, DstId = 98 SrcId = 98, DstId = 99 ... How can I get a listing in the correct path traversal order (see below)? SrcId = 11, DstId = 1 SrcId = 1, DstId = 2 SrcId = 2, DstId = 3 ... SrcId = 8, DstId = 9 SrcId = 9, DstId = 10 SrcId = 10, DstId = 11 SrcId = 11, DstId = 12 ... SrcId = 14, DstId = 15 SrcId = 15, DstId = 16 SrcId = 16, DstId = 98 SrcId = 98, DstId = 99 ... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Correct-path-traversal-order-from-an-Array-Edge-ED-tp22100.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Building Spark on Windows WAS: Any IRC channel on Spark?
Can you look in build output for scalastyle warning in mllib module ? Cheers On Mar 17, 2015, at 3:00 AM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dear Yu, With -X i got below error. [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 7.418 s] [INFO] Spark Project Networking ... SUCCESS [ 16.551 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 10.392 s] [INFO] Spark Project Core . SUCCESS [04:26 min] [INFO] Spark Project Bagel SUCCESS [ 23.876 s] [INFO] Spark Project GraphX ... SUCCESS [01:02 min] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:45 min] [INFO] Spark Project SQL .. SUCCESS [02:16 min] [INFO] Spark Project ML Library ... FAILURE [02:38 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 14:54 min [INFO] Finished at: 2015-03-17T12:54:19+03:00 [INFO] Final Memory: 76M/1702M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution: You have 1 Scalastyle violation(s). - [Hel p 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execut ion at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:216) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216) at org.apache.maven.cli.MavenCli.main(MavenCli.java:160) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356) Caused by: org.apache.maven.plugin.MojoExecutionException: Failed during scalastyle execution at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.performCheck(ScalastyleViolationCheckMojo.java:238) at org.scalastyle.maven.plugin.ScalastyleViolationCheckMojo.execute(ScalastyleViolationCheckMojo.java:199)
Re: Building Spark on Windows WAS: Any IRC channel on Spark?
Dear Yu, Are you mean scalastyle-output.xml? i coped its content below ?xml version=1.0 encoding=UTF-8? checkstyle version=5.0 file name=C:\Nawwar\Hadoop\spark\spark-1.3.0\mllib\src\main\scala\org\apache\spark\mllib\clustering\LDAModel.scala error severity=error message=Input length = 1/error /file /checkstyle On Tue, Mar 17, 2015 at 4:11 PM, Ted Yu yuzhih...@gmail.com wrote: Can you look in build output for scalastyle warning in mllib module ? Cheers On Mar 17, 2015, at 3:00 AM, Ahmed Nawar ahmed.na...@gmail.com wrote: Dear Yu, With -X i got below error. [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM ... SUCCESS [ 7.418 s] [INFO] Spark Project Networking ... SUCCESS [ 16.551 s] [INFO] Spark Project Shuffle Streaming Service SUCCESS [ 10.392 s] [INFO] Spark Project Core . SUCCESS [04:26 min] [INFO] Spark Project Bagel SUCCESS [ 23.876 s] [INFO] Spark Project GraphX ... SUCCESS [01:02 min] [INFO] Spark Project Streaming SUCCESS [01:46 min] [INFO] Spark Project Catalyst . SUCCESS [01:45 min] [INFO] Spark Project SQL .. SUCCESS [02:16 min] [INFO] Spark Project ML Library ... FAILURE [02:38 min] [INFO] Spark Project Tools SKIPPED [INFO] Spark Project Hive . SKIPPED [INFO] Spark Project REPL . SKIPPED [INFO] Spark Project Hive Thrift Server ... SKIPPED [INFO] Spark Project Assembly . SKIPPED [INFO] Spark Project External Twitter . SKIPPED [INFO] Spark Project External Flume Sink .. SKIPPED [INFO] Spark Project External Flume ... SKIPPED [INFO] Spark Project External MQTT SKIPPED [INFO] Spark Project External ZeroMQ .. SKIPPED [INFO] Spark Project External Kafka ... SKIPPED [INFO] Spark Project Examples . SKIPPED [INFO] Spark Project External Kafka Assembly .. SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 14:54 min [INFO] Finished at: 2015-03-17T12:54:19+03:00 [INFO] Final Memory: 76M/1702M [INFO] [ERROR] Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execution: You have 1 Scalastyle violation(s). - [Hel p 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.scalastyle:scalastyle-maven-plugin:0.4.0:check (default) on project spark-mllib_2.10: Failed during scalastyle execut ion at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:216) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80) at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216) at org.apache.maven.cli.MavenCli.main(MavenCli.java:160) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415) at
Re: Process time series RDD after sortByKey
Hi Imran, This is extremely helpful. This is not only an approach, also help me to understand how to affect or customize my own DAG effectively. Thanks a lot! Shuai On Monday, March 16, 2015, Imran Rashid iras...@cloudera.com wrote: Hi Shuai, yup, that is exactly what I meant -- implement your own class MyGroupingRDD. This is definitely more detail than a lot of users will need to go, but its also not all that scary either. In this case, you want something that is *extremely* close to the existing CoalescedRDD, so start by looking at that code. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala The only thing which is complicated in CoalescedRDD is the PartitionCoalescer, but that is completely irrelevant for you, so you can ignore it. I started writing up a description of what to do but then I realized just writing the code would be easier :) Totally untested, but here you go: https://gist.github.com/squito/c2d1dd5413a60830d6f3 The only really interesting part here is getPartitions: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L31 That's where you create partitions in your new RDD, which depend on multiple RDDs from the parent. Also note that compute() is very simple: you just concatenate together the iterators from each of the parent RDDs: https://gist.github.com/squito/c2d1dd5413a60830d6f3#file-groupedrdd-scala-L37 let me know how it goes! On Mon, Mar 16, 2015 at 5:15 PM, Shuai Zheng szheng.c...@gmail.com javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com'); wrote: Hi Imran, I am a bit confused here. Assume I have RDD a with 1000 partition and also has been sorted. How can I control when creating RDD b (with 20 partitions) to make sure 1-50 partition of RDD a map to 1st partition of RDD b? I don’t see any control code/logic here? You code below: val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) Does it means I need to define/develop my own MyGroupingRDD class? I am not very clear how to do that, any place I can find an example? I never create my own RDD class before (not RDD instance J). But this is very valuable approach to me so I am desired to learn. Regards, Shuai *From:* Imran Rashid [mailto:iras...@cloudera.com javascript:_e(%7B%7D,'cvml','iras...@cloudera.com');] *Sent:* Monday, March 16, 2015 11:22 AM *To:* Shawn Zheng; user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); *Subject:* Re: Process time series RDD after sortByKey Hi Shuai, On Sat, Mar 14, 2015 at 11:02 AM, Shawn Zheng szheng.c...@gmail.com javascript:_e(%7B%7D,'cvml','szheng.c...@gmail.com'); wrote: Sorry I response late. Zhan Zhang's solution is very interesting and I look at into it, but it is not what I want. Basically I want to run the job sequentially and also gain parallelism. So if possible, if I have 1000 partition, the best case is I can run it as 20 subtask, each one take partition: 1-50, 51-100, 101-150, etc. If we have ability to do this, we will gain huge flexibility when we try to process some time series like data and a lot of algo will benefit from it. yes, this is what I was suggesting you do. You would first create one RDD (a) that has 1000 partitions. Don't worry about the creation of this RDD -- it wont' create any tasks, its just a logical holder of your raw data. Then you create another RDD (b) that depends on your RDD (a), but that only has 20 partitions. Each partition in (b) would depend on a number of partitions from (a). As you've suggested, partition 1 in (b) would depend on partitions 1-50 in (a), partition 2 in (b) would depend on 51-100 in (a), etc. Note that RDD (b) still doesn't *do* anything. Its just another logical holder for your data, but this time grouped in the way you want. Then after RDD (b), you would do whatever other transformations you wanted, but now you'd be working w/ 20 partitions: val rawData1000Partitions = sc.textFile(...) // or whatever val groupedRawData20Partitions = new MyGroupingRDD(rawData1000Partitions) groupedRawData20Partitions.map{...}.filter{...}.reduceByKey{...} //etc. note that this is almost exactly the same as what CoalescedRdd does. However, it might combine the partitions in whatever ways it feels like -- you want them combined in a very particular order. So you'll need to create your own subclass. Back to Zhan Zhang's while( iterPartition RDD.partitions.length) { val res = sc.runJob(this, (it: Iterator[T]) = somFunc, iterPartition, allowLocal = true) Some other function after processing one partition. iterPartition += 1 } I am curious how spark process this without parallelism, the indidivual partition will pass back to driver to process or just run one task on that node which partition exist? then follow by another partition on another node?
HiveContext can't find registered function
Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir
Downloading data from url
Hi experts! Is there any api in spark to download data from url? I want to download data from url in a spark application. I want to get downloading on all nodes instead of a single node. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Downloading-data-from-url-tp22102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Downloading data from url
Please take a look at downloadFile() method in ./core/src/main/scala/org/apache/spark/util/Utils.scala You can find usage in doFetchFile(). FYI On Tue, Mar 17, 2015 at 6:52 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi experts! Is there any api in spark to download data from url? I want to download data from url in a spark application. I want to get downloading on all nodes instead of a single node. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Downloading-data-from-url-tp22102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.3 release
OK, did you build with YARN support (-Pyarn)? and the right incantation of flags like -Phadoop-2.4 -Dhadoop.version=2.5.0-cdh5.3.2 or similar? On Tue, Mar 17, 2015 at 2:39 PM, Eric Friedman eric.d.fried...@gmail.com wrote: I did not find that the generic build worked. In fact I also haven't gotten a build from source to work either, though that one might be a case of PEBCAK. In the former case I got errors about the build not having YARN support. On Sun, Mar 15, 2015 at 3:03 AM, Sean Owen so...@cloudera.com wrote: I think (I hope) it's because the generic builds just work. Even though these are of course distributed mostly verbatim in CDH5, with tweaks to be compatible with other stuff at the edges, the stock builds should be fine too. Same for HDP as I understand. The CDH4 build may work on some builds of CDH4, but I think is lurking there as a Hadoop 2.0.x plus a certain YARN beta build. I'd prefer to rename it that way, myself, since it doesn't actually work with all of CDH4 anyway. Are the MapR builds there because the stock Hadoop build doesn't work on MapR? that would actually surprise me, but then, why are these two builds distributed? On Sun, Mar 15, 2015 at 6:22 AM, Eric Friedman eric.d.fried...@gmail.com wrote: Is there a reason why the prebuilt releases don't include current CDH distros and YARN support? Eric Friedman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: 1.3 release
I did not find that the generic build worked. In fact I also haven't gotten a build from source to work either, though that one might be a case of PEBCAK. In the former case I got errors about the build not having YARN support. On Sun, Mar 15, 2015 at 3:03 AM, Sean Owen so...@cloudera.com wrote: I think (I hope) it's because the generic builds just work. Even though these are of course distributed mostly verbatim in CDH5, with tweaks to be compatible with other stuff at the edges, the stock builds should be fine too. Same for HDP as I understand. The CDH4 build may work on some builds of CDH4, but I think is lurking there as a Hadoop 2.0.x plus a certain YARN beta build. I'd prefer to rename it that way, myself, since it doesn't actually work with all of CDH4 anyway. Are the MapR builds there because the stock Hadoop build doesn't work on MapR? that would actually surprise me, but then, why are these two builds distributed? On Sun, Mar 15, 2015 at 6:22 AM, Eric Friedman eric.d.fried...@gmail.com wrote: Is there a reason why the prebuilt releases don't include current CDH distros and YARN support? Eric Friedman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
org.apache.hadoop.hive.serde2.SerDeException: org.codehaus.jackson.JsonParseException
I have a hadoop cluster and I need to query the data stored on the HDFS using spark sql thrift server. Spark sql thrift server is up and running. It is configured to read from HIVE table. The hive table is an external table that corresponding to set of files stored on HDFS. These files contains JSON data. I am connecting to spark sql thrift server using beeline. When I try to execute a simple query like *select * from mytable limit 3* every thing works fine. But when I try to execute other queries like *select count(*) from mytable* the following exceptions is thrown *org.apache.hadoop.hive.serde2.SerDeException: org.codehaus.jackson.JsonParseException: Unrecognized character escape ' ' (code 32) at [Source: java.io.StringReader@34ef429a; line: 1, column: 351]* What I understand from the exception is that there are some files contains corrupted JSON. question 1 : am I understand this correctly? question 2 : How can I find the file(s) causes this problem if I have about 3 thousand files and each file contains about 700 line of json data ? question 3 : If I am sure that the json in the files on HDFS contains valid json data, what should I do ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-hadoop-hive-serde2-SerDeException-org-codehaus-jackson-JsonParseException-tp22103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unable to saveAsParquetFile to HDFS since Spark 1.3.0
Hi all, today we tested Spark 1.3.0. Everything went pretty fine except that I seem to be unable to save an RDD as parquet to HDFS. A minimum example is: import sqlContext.implicits._ // Reading works fine! val foo: RDD[String] = spark.textFile(hdfs://) // this works foo.toDF().saveAsParquetFile(/tmp/sparktest) // save to local foo.saveAsTextFile(/tmp/sparktest) // save to local foo.saveAsTextFile(hdfs://server/tmp/sparktest) // But even this works! // this doesn't work foo.toDF().saveAsParquetFile(hdfs:// ) This throws the following exception. And after quite some googling I am running out of ideas and would be happy about help. Exception in thread main java.lang.IllegalArgumentException: Wrong FS: hdfs://server/tmp/sparktest_fg, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:590) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:410) at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:108) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:251) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:370) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:96) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:125) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123) at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:922) at Pi2Parquet$delayedInit$body.apply(Pi2Parquet.scala:45) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Pi2Parquet$.main(Pi2Parquet.scala:12) at Pi2Parquet.main(Pi2Parquet.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks a lot Franz - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can LBFGS be used on streaming data?
Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Spark @ EC2: Futures timed out Ask timed out
Hi Akhil, Thanks! I think that was it. Had to open a bunch of ports (didn't use spark-ec2, so it didn't do that for me) and the app works fine now. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Tue, Mar 17, 2015 at 3:26 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you launch the cluster using spark-ec2 script? Just make sure all ports are open for master, slave instances security group. From the error, it seems its not able to connect to the driver program (port 58360) Thanks Best Regards On Tue, Mar 17, 2015 at 3:26 AM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, I've been trying to run a simple SparkWordCount app on EC2, but it looks like my apps are not succeeding/completing. I'm suspecting some sort of communication issue. I used the SparkWordCount app from http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/ Digging through logs I found this: 15/03/16 21:28:20 INFO Utils: Successfully started service 'driverPropsFetcher' on port 58123. Exception in thread main java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1563) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:60) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:115) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:163) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) * Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] * at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:127) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) ... 4 more Or exceptions like: *Caused by: akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://sparkDriver@ip-10-111-222-111.ec2.internal:58360/), Path(/user/CoarseGrainedScheduler)]] after [3 ms] * at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:333) at akka.actor.Scheduler$$anon$7.run(Scheduler.scala:117) at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:691) at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(Scheduler.scala:467) at akka.actor.LightArrayRevolverScheduler$$anon$8.executeBucket$1(Scheduler.scala:419) at akka.actor.LightArrayRevolverScheduler$$anon$8.nextTick(Scheduler.scala:423) at akka.actor.LightArrayRevolverScheduler$$anon$8.run(Scheduler.scala:375) at java.lang.Thread.run(Thread.java:745) This is in EC2 and I have ports 22, 7077, 8080, and 8081 open to any source. But maybe I need to do something, too? I do see Master sees Workers and Workers do connect to the Master. I did run this in spark-shell, and it runs without problems; scala val something = sc.parallelize(1 to 1000).collect().filter(_1000 This is how I submitted the job (on the Master machine): $ spark-1.2.1-bin-hadoop2.4/bin/spark-submit --class com.cloudera.sparkwordcount.SparkWordCount --executor-memory 256m --master spark://ip-10-171-32-62:7077 wc-spark/target/sparkwordcount-0.0.1-SNAPSHOT.jar /usr/share/dict/words 0 Any help would be greatly appreciated. Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/
Why I didn't see the benefits of using KryoSerializer
Hi, I am new to Spark. I tried to understand the memory benefits of using KryoSerializer. I have this one box standalone test environment, which is 24 cores with 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0. I put one text file in the hdfs about 1.2G. Here is the settings in the spark-env.sh export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=4g First test case:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)log.count()log.count() The data is about 3M rows. For the first test case, from the storage in the web UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 cached partitions.This matched with what I thought, and first count finished about 17s, and 2nd count finished about 6s. 2nd test case after restart the spark-shell:val log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached is 100% with 10 cached partitions. It looks like caching the default java serialized format reduce the memory usage, but coming with a cost that first count finished around 39s and 2nd count finished around 9s. So the job runs slower, with less memory usage. So far I can understand all what happened and the tradeoff. Now the problem comes with when I tried to test with KryoSerializer SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer /opt/spark/bin/spark-shellval log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count() First, I saw that the new serializer setting passed in, as proven in the Spark Properties of Environment shows spark.driver.extraJavaOptions -Dspark.serializer=org.apache.spark.serializer.KryoSerializer . This is not there for first 2 test cases.But in the web UI of Storage, the Size in Memory is 1234M, with 100% Fraction Cached and 10 cached partitions. The first count took 46s and 2nd count took 23s. I don't get much less memory size as I expected, but longer run time for both counts. Anything I did wrong? Why the memory foot print of MEMORY_ONLY_SER for KryoSerializer still use the same size as default Java serializer, with worse duration? Thanks Yong
Re: Spark SQL. Cast to Bigint
Hi Yin With HiveContext works well. Thanks!!! Regars. Miguel Angel. On Fri, Mar 13, 2015 at 3:18 PM, Yin Huai yh...@databricks.com wrote: Are you using SQLContext? Right now, the parser in the SQLContext is quite limited on the data type keywords that it handles (see here https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala#L391) and unfortunately bigint is not handled in it right now. We will add other data types in there ( https://issues.apache.org/jira/browse/SPARK-6146 is used to track it). Can you try HiveContext for now? On Fri, Mar 13, 2015 at 4:48 AM, Masf masfwo...@gmail.com wrote: Hi. I have a query in Spark SQL and I can not covert a value to BIGINT: CAST(column AS BIGINT) or CAST(0 AS BIGINT) The output is: Exception in thread main java.lang.RuntimeException: [34.62] failure: ``DECIMAL'' expected but identifier BIGINT found Thanks!! Regards. Miguel Ángel -- Saludos. Miguel Ángel
High GC time
Hi, I am getting very high GC time in my jobs. For smaller/real-time load, this becomes a real problem. Below are the details of a task I just ran. What could be the cause of such skewed GC times? 36 26010 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:44 20 s11 s 132.7 KB135.8 KB 37 26020 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:44 15 s11 s 79.4 KB 82.5 KB 38 26030 SUCCESS PROCESS_LOCAL 1 / Slave2 2015/03/17 11:18:44 2 s 0.7 s 0.0 B 37.8 KB 39 26040 SUCCESS PROCESS_LOCAL 0 / slave3 2015/03/17 11:18:45 21 s18 s 77.9 KB 79.8 KB 40 26050 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:45 14 s10 s 73.0 KB 74.9 KB 41 26060 SUCCESS PROCESS_LOCAL 2 / Slave1 2015/03/17 11:18:45 14 s10 s 74.4 KB 76.5 KB 42 26070 SUCCESS PROCESS_LOCAL 0 / Slave3 2015/03/17 11:18:45 12 s12 s 10.9 KB 12.8 KB Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/High-GC-time-tp22104.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unable to saveAsParquetFile to HDFS since Spark 1.3.0
This has been fixed by https://github.com/apache/spark/pull/5020 On 3/18/15 12:24 AM, Franz Graf wrote: Hi all, today we tested Spark 1.3.0. Everything went pretty fine except that I seem to be unable to save an RDD as parquet to HDFS. A minimum example is: import sqlContext.implicits._ // Reading works fine! val foo: RDD[String] = spark.textFile(hdfs://) // this works foo.toDF().saveAsParquetFile(/tmp/sparktest) // save to local foo.saveAsTextFile(/tmp/sparktest) // save to local foo.saveAsTextFile(hdfs://server/tmp/sparktest) // But even this works! // this doesn't work foo.toDF().saveAsParquetFile(hdfs:// ) This throws the following exception. And after quite some googling I am running out of ideas and would be happy about help. Exception in thread main java.lang.IllegalArgumentException: Wrong FS: hdfs://server/tmp/sparktest_fg, expected: file:/// at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:590) at org.apache.hadoop.fs.FileSystem.makeQualified(FileSystem.java:410) at org.apache.hadoop.fs.FilterFileSystem.makeQualified(FilterFileSystem.java:108) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:252) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:251) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:251) at org.apache.spark.sql.parquet.ParquetRelation2.init(newParquet.scala:370) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:96) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:125) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:308) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1123) at org.apache.spark.sql.DataFrame.saveAsParquetFile(DataFrame.scala:922) at Pi2Parquet$delayedInit$body.apply(Pi2Parquet.scala:45) at scala.Function0$class.apply$mcV$sp(Function0.scala:40) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.App$$anonfun$main$1.apply(App.scala:71) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) at scala.App$class.main(App.scala:71) at Pi2Parquet$.main(Pi2Parquet.scala:12) at Pi2Parquet.main(Pi2Parquet.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Thanks a lot Franz - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark yarn-client submission example?
Hi, We have a Scala application and we want it to programmatically submit Spark jobs to a Spark-YARN cluster in yarn-client mode. We're running into a lot of classpath issues, e.g. once submitted it looks for jars in our parent Scala application's local directory, jars that it shouldn't need. Our setJars in the SparkContext only mentions our fat jar, which should be all it needs. We are not sure why the other jars are being included once we submit and we don't see a mechanism to control what it wants. Here's a sample error: Diagnostics: java.io.FileNotFoundException: File file:/Users/github/spark/kindling-container/lib/spark-assembly-1.2.1-hadoop2.4.0.jar does not exist Failing this attempt. Failing the application. I read through the user list and there was discussion around possibly using Client.scala? Are there any code examples out there that we could use as reference? thanks, Michal
Re: HiveContext can't find registered function
Seems basic_null_diluted_d was not resolved? Can you check if basic_null_diluted_d is in you table? On Tue, Mar 17, 2015 at 9:34 AM, Ophir Cohen oph...@gmail.com wrote: Hi Guys, I'm registering a function using: sqlc.registerFunction(makeEstEntry,ReutersDataFunctions.makeEstEntry _) Then I register the table and try to query the table using that function and I get: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'makeEstEntry(numest#20,median#21,mean#22,stddev#23,high#24,low#25,currency_#26,units#27,'basic_null_diluted_d) AS FY0#2837, tree: Thanks! Ophir