Re: How to write mapreduce programming in spark by using java on user-defined javaPairRDD?
Hi MIssie, In the Java API, you should consider: 1. RDD.map https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#map(scala.Function1,%20scala.reflect.ClassTag) to transform the text 2. RDD.sortBy https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#sortBy(scala.Function1,%20boolean,%20int,%20scala.math.Ordering,%20scala.reflect.ClassTag) to order by LongWritable 3. RDD.saveAsTextFile https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#saveAsTextFile(java.lang.String) to write to HDFS On Tue, Jul 7, 2015 at 7:18 AM, 付雅丹 yadanfu1...@gmail.com wrote: Hi, everyone! I've got key,value pair in form of LongWritable, Text, where I used the following code: SparkConf conf = new SparkConf().setAppName(MapReduceFileInput); JavaSparkContext sc = new JavaSparkContext(conf); Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text sourceFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat.class,LongWritable.class,Text.class,confHadoop); Now I want to handle the javapairrdd data from LongWritable, Text to another LongWritable, Text, where the Text content is different. After that, I want to write Text into hdfs in order of LongWritable value. But I don't know how to write mapreduce function in spark using java language. Someone can help me? Sincerely, Missie.
Regarding master node failure
Hi, What happens if the master node fails in the case of Spark Streaming? Would the data be lost? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regarding-master-node-failure-tp23701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.lang.OutOfMemoryError: PermGen space
Stati, Change SPARK_REPL_OPTS to SPARK_SUBMIT_OPTS and try again. I faced the same issue and making this change worked for me. I looked at the spark-shell file under the bin dir and found SPARK_SUBMIT_OPTS being used. SPARK_SUBMIT_OPTS=-XX:MaxPermSize=256m bin/spark-shell --master spark://machu:7077 --total-executor-cores 12 --packages com.databricks:spark-csv_2.10:1.0.3 --packages joda-time:joda-time:2.8.1 -SparklineData -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-PermGen-space-tp23472p23702.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Windows - endless Dependency-reduced POM written... in Bagel build
See this thread: http://search-hadoop.com/m/q3RTtxVUrL1AvnPj2 On Tue, Jul 7, 2015 at 10:04 AM, Lincoln Atkinson lat...@microsoft.com wrote: I’m trying to build Spark from source on Windows 8.1, using a recent Cygwin install and JDK 8u45. From the root of my enlistment, I’m running `build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package` The build moves along just fine for a while, until it builds “Spark Project Bagel 1.5.0-SNAPSHOT”. At this point it gets stuck in a seemingly endless loop, repeating “[INFO] Dependency-reduced POM written at: C:\Users\latkin\Source\Repos\spark\bagel\dependency-reduced-pom.xml”. I let this go on for maybe 30-45 min before killing the build. The XML file itself appears to have quite a lot of repeated data. Is this expected, and it will finish eventually? Is there a workaround? I’ve shared my full build log and pom XML at https://gist.github.com/latkin/1bdfeb1380d0dced0601 Thanks, -Lincoln
DataFrame question
Hi All, I am working with dataframes and have been struggling with this thing, any pointers would be helpful. I've a Json file with the schema like this, links: array (nullable = true) ||-- element: struct (containsNull = true) |||-- desc: string (nullable = true) |||-- id: string (nullable = true) I want to fetch id and desc as an RDD like this RDD[(String,String)] i am using dataframes*df.select(links.desc,links.id http://links.id/).rdd* the above dataframe is returning an RDD like this RDD[(List(String),List(String)] So, links:[{one,1},{two,2},{three,3}] json should return and RDD[(one,1),(two,2),(three,3)] can anyone tell me how the dataframe select should be modified?
Re: Windows - endless Dependency-reduced POM written... in Bagel build
Looks like a workaround has gone in: [SPARK-8819] Fix build for maven 3.3.x FYI On Tue, Jul 7, 2015 at 10:09 AM, Ted Yu yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTtxVUrL1AvnPj2 On Tue, Jul 7, 2015 at 10:04 AM, Lincoln Atkinson lat...@microsoft.com wrote: I’m trying to build Spark from source on Windows 8.1, using a recent Cygwin install and JDK 8u45. From the root of my enlistment, I’m running `build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package` The build moves along just fine for a while, until it builds “Spark Project Bagel 1.5.0-SNAPSHOT”. At this point it gets stuck in a seemingly endless loop, repeating “[INFO] Dependency-reduced POM written at: C:\Users\latkin\Source\Repos\spark\bagel\dependency-reduced-pom.xml”. I let this go on for maybe 30-45 min before killing the build. The XML file itself appears to have quite a lot of repeated data. Is this expected, and it will finish eventually? Is there a workaround? I’ve shared my full build log and pom XML at https://gist.github.com/latkin/1bdfeb1380d0dced0601 Thanks, -Lincoln
Re: Hibench build fail
bq. Need I specify my spark version Looks like the build used 1.4.0 SNAPSHOT. Please use 1.4.0 release. Cheers On Mon, Jul 6, 2015 at 11:50 PM, luohui20...@sina.com wrote: Hi grace, recently I am trying Hibench to evaluate my spark cluster, however I got a problem in building Hibench, would you help to take a look? thanks. It fails at building Sparkbench, and you may check the attched pic for more info. My spark version :1.3.1,hadoop version :2.7.0 and HiBench version:4.0, python 2.6.6. It is reported that failed for spark1.4 and MR1,which I didn't install in my cluster.Need I specify my spark version and hadoop version when I am running bin/build-all.sh? thanks. Thanksamp;Best regards! San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Windows - endless Dependency-reduced POM written... in Bagel build
I'm trying to build Spark from source on Windows 8.1, using a recent Cygwin install and JDK 8u45. From the root of my enlistment, I'm running `build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package` The build moves along just fine for a while, until it builds Spark Project Bagel 1.5.0-SNAPSHOT. At this point it gets stuck in a seemingly endless loop, repeating [INFO] Dependency-reduced POM written at: C:\Users\latkin\Source\Repos\spark\bagel\dependency-reduced-pom.xml. I let this go on for maybe 30-45 min before killing the build. The XML file itself appears to have quite a lot of repeated data. Is this expected, and it will finish eventually? Is there a workaround? I've shared my full build log and pom XML at https://gist.github.com/latkin/1bdfeb1380d0dced0601 Thanks, -Lincoln
Re: Master doesn't start, no logs
Thanks, I tried that, and the result was the same. I still can start a master from the spark-1.4.0-bin-hadoop2.4 pre-built version thought I don't really know what to show more than the strace that I already linked, so I could use any hint for that. -- Henri Maxime Demoulin 2015-07-07 9:53 GMT-04:00 Akhil Das ak...@sigmoidanalytics.com: Can you try renaming the ~/.ivy2 file to ~/.ivy2_backup and build spark1.4.0 again and run it? Thanks Best Regards On Tue, Jul 7, 2015 at 6:27 PM, Max Demoulin maxdemou...@gmail.com wrote: Yes, I do set $SPARK_MASTER_IP. I suspect a more internal issue, maybe due to multiple spark/hdfs instances having successively run on the same machine? -- Henri Maxime Demoulin 2015-07-07 4:10 GMT-04:00 Akhil Das ak...@sigmoidanalytics.com: Strange. What are you having in $SPARK_MASTER_IP? It may happen that it is not able to bind to the given ip but again it should be in the logs. Thanks Best Regards On Tue, Jul 7, 2015 at 12:54 AM, maxdml maxdemou...@gmail.com wrote: Hi, I've been compiling spark 1.4.0 with SBT, from the source tarball available on the official website. I cannot run spark's master, even tho I have built and run several other instance of spark on the same machine (spark 1.3, master branch, pre built 1.4, ...) /starting org.apache.spark.deploy.master.Master, logging to /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out failed to launch org.apache.spark.deploy.master.Master: full log in /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out/ But the log file is empty. After digging up to ./bin/spark-class, and finally trying to start the master with: ./bin/spark-class org.apache.spark.deploy.master.Master --host 155.99.144.31 I still have the same result. Here is the strace output for this command: http://pastebin.com/bkJVncBm I'm using a 64 bit Xeon, CentOS 6.5, spark 1.4.0, compiled against hadoop 2.5.2 Any idea? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Master-doesn-t-start-no-logs-tp23651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Windows - endless Dependency-reduced POM written... in Bagel build
That solved it. Thanks! From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Tuesday, July 07, 2015 10:21 AM To: Lincoln Atkinson Cc: user@spark.apache.org Subject: Re: Windows - endless Dependency-reduced POM written... in Bagel build Looks like a workaround has gone in: [SPARK-8819] Fix build for maven 3.3.x FYI On Tue, Jul 7, 2015 at 10:09 AM, Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com wrote: See this thread: http://search-hadoop.com/m/q3RTtxVUrL1AvnPj2 On Tue, Jul 7, 2015 at 10:04 AM, Lincoln Atkinson lat...@microsoft.commailto:lat...@microsoft.com wrote: I’m trying to build Spark from source on Windows 8.1, using a recent Cygwin install and JDK 8u45. From the root of my enlistment, I’m running `build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package` The build moves along just fine for a while, until it builds “Spark Project Bagel 1.5.0-SNAPSHOT”. At this point it gets stuck in a seemingly endless loop, repeating “[INFO] Dependency-reduced POM written at: C:\Users\latkin\Source\Repos\spark\bagel\dependency-reduced-pom.xml”. I let this go on for maybe 30-45 min before killing the build. The XML file itself appears to have quite a lot of repeated data. Is this expected, and it will finish eventually? Is there a workaround? I’ve shared my full build log and pom XML at https://gist.github.com/latkin/1bdfeb1380d0dced0601 Thanks, -Lincoln
Re: How do we control output part files created by Spark job?
Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is it now possible to incrementally update a graph in GraphX
I found this post back in March 2014. http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-td2227.html I was wondering if there is any progress on GraphX Streaming/incremental graph update in GraphX. Or is there a place where I can track the progress on this? BTW, does anyone happen to know why my post is not accepted by the mailing list even I have subscribed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-now-possible-to-incrementally-update-a-graph-in-GraphX-tp23703.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to deal with null values on LabeledPoint
Hello, reading from spark-csv, got some lines with missing data (not invalid). applying map() to create a LabeledPoint with denseVector. Using map( Row = Row.getDouble(col_index) ) To this point: res173: org.apache.spark.mllib.regression.LabeledPoint = (-1.530132691E9,[162.89431,13.55811,18.3346818,-1.6653182]) As running the following code: val model = new LogisticRegressionWithLBFGS(). setNumClasses(2). setValidateData(true). run(data_map) java.lang.RuntimeException: Failed to check null bit for primitive double value. Debugging this, I am pretty sure this is because rows that look like -2.593849123898,392.293891
Re: Regarding master node failure
This talk may help - https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/ On Tue, Jul 7, 2015 at 9:51 AM, swetha swethakasire...@gmail.com wrote: Hi, What happens if the master node fails in the case of Spark Streaming? Would the data be lost? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Regarding-master-node-failure-tp23701.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Hibench build fail
Hi grace, recently I am trying Hibench to evaluate my spark cluster, however I got a problem in building Hibench, would you help to take a look? thanks. It fails at building Sparkbench, and you may check the attched pic for more info. My spark version :1.3.1,hadoop version :2.7.0 and HiBench version:4.0, python 2.6.6. It is reported that failed for spark1.4 and MR1,which I didn't install in my cluster.Need I specify my spark version and hadoop version when I am running bin/build-all.sh? thanks. Thanksamp;Best regards! San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Master doesn't start, no logs
Strange. What are you having in $SPARK_MASTER_IP? It may happen that it is not able to bind to the given ip but again it should be in the logs. Thanks Best Regards On Tue, Jul 7, 2015 at 12:54 AM, maxdml maxdemou...@gmail.com wrote: Hi, I've been compiling spark 1.4.0 with SBT, from the source tarball available on the official website. I cannot run spark's master, even tho I have built and run several other instance of spark on the same machine (spark 1.3, master branch, pre built 1.4, ...) /starting org.apache.spark.deploy.master.Master, logging to /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out failed to launch org.apache.spark.deploy.master.Master: full log in /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out/ But the log file is empty. After digging up to ./bin/spark-class, and finally trying to start the master with: ./bin/spark-class org.apache.spark.deploy.master.Master --host 155.99.144.31 I still have the same result. Here is the strace output for this command: http://pastebin.com/bkJVncBm I'm using a 64 bit Xeon, CentOS 6.5, spark 1.4.0, compiled against hadoop 2.5.2 Any idea? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Master-doesn-t-start-no-logs-tp23651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to debug java.io.OptionalDataException issues
Did you try kryo? Wrap everything with kryo and see if you are still hitting the exception. (At least you could see a different exception stack). Thanks Best Regards On Tue, Jul 7, 2015 at 6:05 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, suffering from a pretty strange issue: Is there a way to tell what object is being successfully serialized/deserialized? I have a maven-installed jar that works well when fat jarred within another, but shows the following stack when marked as provided and copied to the runtime classpath...I'm pretty puzzled but can't find any good way to debug what is causing unhappiness? 15/07/07 00:24:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, osd04.shaka.rum.tn.akamai.com): java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1370) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
SparkSQL OOM issue
Dear all, We've tried to use sparkSql to do some insert from A table to B table action where using the exact same SQL script, hive is able to finish it but Spark 1.3.1 would always end with OOM issue; we tried several configuration including: --executor-cores 2 --num-executors 300 --executor-memory 7g sconf.set(spark.storage.memoryFraction, 0) but none of them can change the result of error: java.lang.OutOfMemoryError: GC overhead limit exceeded is there any other configuration we can make? Thanks! --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly prohibited. If you are not the intended recipient, please notify the sender by replying to this email, and then delete this email and any copies of it immediately. Thank you. --- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL OOM issue
Hi, Where did OOM happened? In Driver or executor? Sometimes SparkSQL Driver OOM on tables with large number partitions. If so, you might want to increase it in spark-defaults.conf spark.driver.memory Shawn On Jul 7, 2015, at 3:58 PM, shsh...@tsmc.com wrote: Dear all, We've tried to use sparkSql to do some insert from A table to B table action where using the exact same SQL script, hive is able to finish it but Spark 1.3.1 would always end with OOM issue; we tried several configuration including: --executor-cores 2 --num-executors 300 --executor-memory 7g sconf.set(spark.storage.memoryFraction, 0) but none of them can change the result of error: java.lang.OutOfMemoryError: GC overhead limit exceeded is there any other configuration we can make? Thanks! --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly prohibited. If you are not the intended recipient, please notify the sender by replying to this email, and then delete this email and any copies of it immediately. Thank you. --- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can we allow executor to exit when tasks fail too many time?
Any Response? 2015-07-06 12:28 GMT+08:00 Tao Li litao.bupt...@gmail.com: Node cloud10141049104.wd.nm.nop.sogou-op.org and cloud101417770.wd.nm.ss.nop.sogou-op.org failed too many times, I want to know if it can be auto offline when failed too many times? 2015-07-06 12:25 GMT+08:00 Tao Li litao.bupt...@gmail.com: I have a long live spark application running on YARN. In some nodes, it try to write to the shuffle path in the shuffle map task. But the root path /search/hadoop10/yarn_local/usercache/spark/ was deleted, so the task is failed. So every time when running shuffle map task on this node, it was always failed due to the root path not existed. I want to know if can set the executor max task failed num? If the task failed num exceed the threshold, we can let the exectuor offline and offer a new executor by driver? shuffle path : /search/hadoop10/yarn_local/usercache/spark/appcache/application_1434370929997_155180/spark-local-20150703120414-a376/0e/shuffle_20002_720_0.data
HiveContext throws org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Just trying to get started with Spark and attempting to use HiveContext using spark-shell to interact with existing Hive tables on my CDH cluster but keep running into the errors (pls see below) when I do 'hiveContext.sql(show tables)'. Wanted to know what all JARs need to be included to have this working. Thanks! java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:472) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:241) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:240) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:86) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1488) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:64) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:74) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2845) at
Maintain Persistent Connection with Hive meta store
Hi I am new to Apache Spark and I have tried to query hive tables using Apache Spark Sql. First I have tried it in Spark-shell where I can query 1 lakh records from hive table within a second. Then I have tried in a java code which always take more than 10 seconds and I have noted that each time when I run that jar it tries to make connection with hive metastore. can any one tell me how to maintain the connection between Apache spark and Hive metastore or else how to achieve that same in java. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Maintain-Persistent-Connection-with-Hive-meta-store-tp23664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sparkr-submit additional R files
You can just use `--files` and I think it should work. Let us know on https://issues.apache.org/jira/browse/SPARK-6833 if it doesn't work as expected. Thanks Shivaram On Tue, Jul 7, 2015 at 5:13 AM, Michał Zieliński zielinski.mich...@gmail.com wrote: Hi all, *spark-submit* for Python and Java/Scala has *--py-files* and *--jars* options for submitting additional files on top of the main application. Is there any such option for *sparkr-submit*? I know that there is *includePackage() *R function to add library dependencies, but can you add other sources that are not R libraries (e.g. additional code repositories?). I really appreciate your help. Thanks, Michael
How to change hive database?
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext I'm getting org.apache.spark.sql.catalyst.analysis.NoSuchTableException from: val dataframe = hiveContext.table(other_db.mytable) Do I have to change current database to access it? Is it possible to do this? I'm guessing that the database.table syntax that I used in hiveContext.table is not recognized. I have no problems accessing tables in the database called default. I can list tables in other_db with hiveContext.tableNames(other_db) Using Spark 1.4.0.
Re: How to change hive database?
See this thread http://search-hadoop.com/m/q3RTt0NFls1XATV02 Cheers On Tue, Jul 7, 2015 at 11:07 AM, Arun Luthra arun.lut...@gmail.com wrote: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.hive.HiveContext I'm getting org.apache.spark.sql.catalyst.analysis.NoSuchTableException from: val dataframe = hiveContext.table(other_db.mytable) Do I have to change current database to access it? Is it possible to do this? I'm guessing that the database.table syntax that I used in hiveContext.table is not recognized. I have no problems accessing tables in the database called default. I can list tables in other_db with hiveContext.tableNames(other_db) Using Spark 1.4.0.
(de)serialize DStream
In Spark Streaming, when using updateStateByKey, it requires the generated DStream to be checkpointed. It seems that it always use JavaSerializer, no matter what I set for spark.serializer. Can I use KryoSerializer for checkpointing? If not, I assume the key and value types have to be Serializable? Chen
Re: how to black list nodes on the cluster
Hi again, Ok, now I do not know of any way to fix the problem other then delete the bad machine from the config + restart .. And you will need admin privileges on cluster for that :( However, before we give up on the speculative execution, I suspect that the task is being run again and again on the same faulty machine because that is where the data resides. You could try to store / persist your RDD with MEMORY_ONLY_2 or MEMORY_AND_DISK_2 as that will force the creation of a replica of the data on another node. Thus, with two nodes, the scheduler may choose to execute the speculative task on the second node (I'm not sure about his as I am just not familiar enough with the Sparks scheduler priorities). I'm not very hopeful but it may be worth a try (if you have the disk/RAM space to be able to afford to duplicate all the data that is). If not, I am afraid I am out of ideas ;) Regards and good luck, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-black-list-nodes-on-the-cluster-tp23650p23704.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DataFrame question
You probably want to explode the array to produce one row per element: df.select(explode(df(links)).alias(link)) On Tue, Jul 7, 2015 at 10:29 AM, Naveen Madhire vmadh...@umail.iu.edu wrote: Hi All, I am working with dataframes and have been struggling with this thing, any pointers would be helpful. I've a Json file with the schema like this, links: array (nullable = true) ||-- element: struct (containsNull = true) |||-- desc: string (nullable = true) |||-- id: string (nullable = true) I want to fetch id and desc as an RDD like this RDD[(String,String)] i am using dataframes*df.select(links.desc,links.id http://links.id/).rdd* the above dataframe is returning an RDD like this RDD[(List(String),List(String)] So, links:[{one,1},{two,2},{three,3}] json should return and RDD[(one,1),(two,2),(three,3)] can anyone tell me how the dataframe select should be modified?
What else is need to setup native support of BLAS/LAPACK with Spark?
Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun
Re: How to implement top() and filter() on object List for JavaRDD
bq. my class has already implemented the java.io.Serializable Can you show the code for Model.User class ? Cheers On Tue, Jul 7, 2015 at 8:18 AM, Hafsa Asif hafsa.a...@matchinguu.com wrote: Thank u so much for the solution. I run the code like this, JavaRDDUser rdd = context.parallelize(usersList); JavaRDDUser rdd_sorted_users= rdd.sortBy(new FunctionUser,String(){ @Override public String call(User usr1) throws Exception { String userName = usr1.getUserName().toUpperCase(); return userName ; } }, false, 1); User user_top=rdd_sorted_users.first(); System.out.println(The top user is :+user_top.getUserName()); But it is giving me this exception, however my class has already implemented the java.io.Serializable 15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. java.io.NotSerializableException: Model.User at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl.org $apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79) at
Re: is it possible to disable -XX:OnOutOfMemoryError=kill %p for the executors?
SIGTERM on YARN generally means the NM is killing your executor because it's running over its requested memory limits. Check your NM logs to make sure. And then take a look at the memoryOverhead setting for driver and executors (http://spark.apache.org/docs/latest/running-on-yarn.html). On Tue, Jul 7, 2015 at 7:43 AM, Kostas Kougios kostas.koug...@googlemail.com wrote: I've recompiled spark deleting the -XX:OnOutOfMemoryError=kill declaration, but still I am getting a SIGTERM! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-it-possible-to-disable-XX-OnOutOfMemoryError-kill-p-for-the-executors-tp23680p23687.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: How do we control output part files created by Spark job?
Hi. I am just wondering if the rdd was actually modified. Did you test it by printing rdd.partitions.length before and after? Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649p23705.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: unable to bring up cluster with ec2 script
Sorry, I can't help with this issue, but if you are interested in a simple way to launch a Spark cluster on Amazon, Spark is now offered as an application in Amazon EMR. With this you can have a full cluster with a few clicks: https://aws.amazon.com/blogs/aws/new-apache-spark-on-amazon-emr/ - Arun On Tue, Jul 7, 2015 at 4:34 PM, Pagliari, Roberto rpagli...@appcomsci.com wrote: I'm following the tutorial about Apache Spark on EC2. The output is the following: $ ./spark-ec2 -i ../spark.pem -k spark --copy launch spark-training Setting up security groups... Searching for existing cluster spark-training... Latest Spark AMI: ami-19474270 Launching instances... Launched 5 slaves in us-east-1d, regid = r-59a0d4b6 Launched master in us-east-1d, regid = r-9ba2d674 Waiting for instances to start up... Waiting 120 more seconds... Copying SSH key ../spark.pem to master... ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: Could not resolve hostname ec2-54-152-15-165.compute-1.amazonaws.com: Name or service not known Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Traceback (most recent call last): File ./spark_ec2.py, line 925, in module main() File ./spark_ec2.py, line 766, in main setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True) File ./spark_ec2.py, line 406, in setup_cluster ssh(master, opts, 'mkdir -p ~/.ssh') File ./spark_ec2.py, line 712, in ssh raise e subprocess.CalledProcessError: Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255 However, I can see the six instances created on my EC2 console, and I could even get the name of the master. I'm not sure how to fix the ssh issue (my region is US EST).
Re: Spark Kafka Direct Streaming
When you enable checkpointing by setting the checkpoint directory, you enable metadata checkpointing. Data checkpointing kicks in only if you are using a DStream operation that requires it, or you are enabling Write Ahead Logs to prevent data loss on driver failure. More discussion - https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/ On Tue, Jul 7, 2015 at 7:42 AM, abi_pat present.boiling2...@gmail.com wrote: Hi, I am using the new experimental Direct Stream API. Everything is working fine but when it comes to fault tolerance, I am not sure how to achieve it. Presently my Kafka config map looks like this configMap.put(zookeeper.connect,192.168.51.98:2181); configMap.put(group.id, UUID.randomUUID().toString()); configMap.put(auto.offset.reset,smallest); configMap.put(auto.commit.enable,true); configMap.put(topics,IPDR31); configMap.put(kafka.consumer.id,kafkasparkuser); configMap.put(bootstrap.servers,192.168.50.124:9092); SetString topic = new HashSetString(); topic.add(IPDR31); JavaPairInputDStreambyte[], byte[] kafkaData = KafkaUtils.createDirectStream(js,byte[].class,byte[].class,DefaultDecoder.class,DefaultDecoder.class,configMap,topic); Questions - Q1- Is my Kafka configuration correct or should it be changed? Q2- I also looked into the Checkpointing but in my usecase, Data checkpointing is not required but meta checkpointing is required. Can I achieve this, i.e. enabling meta checkpointing and not the data checkpointing? Thanks Abhishek Patel -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Streaming-tp23685.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
unable to bring up cluster with ec2 script
I'm following the tutorial about Apache Spark on EC2. The output is the following: $ ./spark-ec2 -i ../spark.pem -k spark --copy launch spark-training Setting up security groups... Searching for existing cluster spark-training... Latest Spark AMI: ami-19474270 Launching instances... Launched 5 slaves in us-east-1d, regid = r-59a0d4b6 Launched master in us-east-1d, regid = r-9ba2d674 Waiting for instances to start up... Waiting 120 more seconds... Copying SSH key ../spark.pem to master... ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: Could not resolve hostname ec2-54-152-15-165.compute-1.amazonaws.com: Name or service not known Error connecting to host Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255, sleeping 30 ssh: connect to host ec2-54-152-15-165.compute-1.amazonaws.com port 22: Connection refused Traceback (most recent call last): File ./spark_ec2.py, line 925, in module main() File ./spark_ec2.py, line 766, in main setup_cluster(conn, master_nodes, slave_nodes, zoo_nodes, opts, True) File ./spark_ec2.py, line 406, in setup_cluster ssh(master, opts, 'mkdir -p ~/.ssh') File ./spark_ec2.py, line 712, in ssh raise e subprocess.CalledProcessError: Command 'ssh -t -o StrictHostKeyChecking=no -i ../spark.pem r...@ec2-54-152-15-165.compute-1.amazonaws.com 'mkdir -p ~/.ssh'' returned non-zero exit status 255 However, I can see the six instances created on my EC2 console, and I could even get the name of the master. I'm not sure how to fix the ssh issue (my region is US EST).
Does spark supports the Hive function posexplode function?
I am trying to use the posexplode function in the HiveContext to auto-generate a sequence number. This feature is supposed to be available Hive 0.13.0. SELECT name, phone FROM contact LATERAL VIEW posexplode(phoneList.phoneNumber) phoneTable AS pos, phone My test program failed with the following java.lang.ClassNotFoundException: posexplode at java.net.URLClassLoader.findClass(URLClassLoader.java:665) at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:942) at java.lang.ClassLoader.loadClass(ClassLoader.java:851) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:827) at org.apache.spark.sql.hive.HiveFunctionWrapper.createFunction(Shim13.scala:147) at org.apache.spark.sql.hive.HiveGenericUdtf.function$lzycompute(hiveUdfs.scala:274) at org.apache.spark.sql.hive.HiveGenericUdtf.function(hiveUdfs.scala:274) Does spark support this Hive function posexplode? If not, how to patch it to support this? I am on Spark 1.3.1 Thanks, Jeff Li
Re: How do we control output part files created by Spark job?
Hi, Did you try to reduce number of executors and cores? usually num-executors * executor-cores = number of parallel tasks, so you can reduce number of parallel tasks in command line like ./bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 4g \ --executor-memory 2g \ --executor-cores 1 \ --queue thequeue \ lib/spark-examples*.jar \ 10 for more details see https://spark.apache.org/docs/1.2.0/running-on-yarn.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649p23706.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practice for using singletons on workers (seems unanswered) ?
Hi, I am seeing a lot of posts on singletons vs. broadcast variables, such as * http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html * http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219 What's the best approach to instantiate an object once and have it be reused by the worker(s). E.g. I have an object that loads some static state such as e.g. a dictionary/map, is a part of 3rd party API and is not serializable. I can't seem to get it to be a singleton on the worker side as the JVM appears to be wiped on every request so I get a new instance. So the singleton doesn't stick. Is there an approach where I could have this object or a wrapper of it be a broadcast var? Can Kryo get me there? would that basically mean writing a custom serializer? However, the 3rd party object may have a bunch of member vars hanging off it, so serializing it properly may be non-trivial... Any pointers/hints greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark-submit can not resolve spark-hive_2.10
I want to add spark-hive as a dependence to submit my job, but it seems that spark-submit can not resolve it. $ ./bin/spark-submit \ → --packages org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1 \ → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \ → --master spark://localhost:7077 \ Ivy Default Cache set to: /home/invkrh/.ivy2/cache The jars for the packages stored in: /home/invkrh/.ivy2/jars https://repository.jboss.org/nexus/content/repositories/releases/ added as a remote repository with the name: repo-1 :: loading settings :: url = jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-hive_2.10 added as a dependency org.postgresql#postgresql added as a dependency joda-time#joda-time added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache found joda-time#joda-time;2.8.1 in central :: resolution report :: resolve 139ms :: artifacts dl 3ms :: modules in use: joda-time#joda-time;2.8.1 from central in [default] org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/6ms) Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveContext at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveContext at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 7 more Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 16:57:59 INFO Utils: Shutdown hook called Any help is appreciated. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark standalone cluster - Output file stored in temporary directory in worker
I think the properties that you have in your hdfs-site.xml should go in the core-site.xml (at least for the namenode.name and datanote.data ones). I might be wrong here, but that's what I have in my setup. you should also add hadoop.tmp.dir in your core-site.xml. That might be the source of your inconsistency. as for hadoop-env.sh, I just use it to export variable such as HADOOP_PREFIX, LOG_DIR, CONF_DIR and JAVA_HOME. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-Output-file-stored-in-temporary-directory-in-worker-tp23653p23697.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to implement top() and filter() on object List for JavaRDD
Thank u so much for the solution. I run the code like this, JavaRDDUser rdd = context.parallelize(usersList); JavaRDDUser rdd_sorted_users= rdd.sortBy(new FunctionUser,String(){ @Override public String call(User usr1) throws Exception { String userName = usr1.getUserName().toUpperCase(); return userName ; } }, false, 1); User user_top=rdd_sorted_users.first(); System.out.println(The top user is :+user_top.getUserName()); But it is giving me this exception, however my class has already implemented the java.io.Serializable 15/07/07 08:13:07 ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry it. java.io.NotSerializableException: Model.User at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441) at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$writeObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:59) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.rdd.ParallelCollectionPartition.writeObject(ParallelCollectionRDD.scala:51) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81) at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:168) at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:231) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.scheduler.TaskSchedulerImpl.org $apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:226) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:295) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$6.apply(TaskSchedulerImpl.scala:293) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:293) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:293) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:79) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:58) at org.apache.spark.rpc.akka.AkkaRpcEnv.org $apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:178) at
Re: Best practice for using singletons on workers (seems unanswered) ?
Would it be possible to have a wrapper class that just represents a reference to a singleton holding the 3rd party object? It could proxy over calls to the singleton object which will instantiate a private instance of the 3rd party object lazily? I think something like this might work if the workers have the singleton object in their classpath. here's a rough sketch of what I was thinking: object ThirdPartySingleton { private lazy val thirdPartyObj = ... def someProxyFunction() = thirdPartyObj.() } class ThirdPartyReference extends Serializable { def someProxyFunction() = ThirdPartySingleton.someProxyFunction() } also found this SO post: http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, I am seeing a lot of posts on singletons vs. broadcast variables, such as * http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html * http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219 What's the best approach to instantiate an object once and have it be reused by the worker(s). E.g. I have an object that loads some static state such as e.g. a dictionary/map, is a part of 3rd party API and is not serializable. I can't seem to get it to be a singleton on the worker side as the JVM appears to be wiped on every request so I get a new instance. So the singleton doesn't stick. Is there an approach where I could have this object or a wrapper of it be a broadcast var? Can Kryo get me there? would that basically mean writing a custom serializer? However, the 3rd party object may have a bunch of member vars hanging off it, so serializing it properly may be non-trivial... Any pointers/hints greatly appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Error when connecting to Spark SQL via Hive JDBC driver
Hi Ratio - You need more than just hive-jdbc jar. Here are all of the jars that I found were needed. I got this list from https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-RunningtheJDBCSampleCode plus trial and error. [image: Inline image 1] -- Eric On Tue, Jul 7, 2015 at 11:02 AM, ratio gms...@gmx.de wrote: Hi, problem not solved yet. Compiling Spark by myself is no option. I don't have permissions and skills for doing that. Could someone please explain, what exactly is causing the problem? If Spark is distributed via pre-compiled versions, why not to add the corresponding JDBC driver jars? At least Spark SQL is competing for Standard Connectivity: Connect through JDBC or ODBC. Spark SQL includes a server mode with industry standard JDBC and ODBC connectivity. It seems like a great piece of software is shipped with a small lack, making it unusable for a part of the community... Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-connecting-to-Spark-SQL-via-Hive-JDBC-driver-tp23397p23691.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do we control output part files created by Spark job?
Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark - redshift !!!
Hi Can you help me how to load data from s3 bucket to redshift , if you gave sample code can you pls send me Thanks su
Parallelizing multiple RDD / DataFrame creation in Spark
Say I have a spark job that looks like following: def loadTable1() { val table1 = sqlContext.jsonFile(ss3://textfiledirectory/) table1.cache().registerTempTable(table1)} def loadTable2() { val table2 = sqlContext.jsonFile(ss3://testfiledirectory2/) table2.cache().registerTempTable(table2)} def loadAllTables() { loadTable1() loadTable2()} loadAllTables() How do I parallelize this Spark job so that both tables are created at the same time or in parallel?
Why can I not insert into TempTables in Spark SQL?
Why does this not work? Is insert into broken in 1.3.1? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore.saveAsParquetFile(/tmp/cache/dayBefore.parquet) val parquetFile = sqlContext.parquetFile(/tmp/cache/dayBefore.parquet) parquetFile.registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start() Or this? val ssc = new StreamingContext(sc, Minutes(10)) val currentStream = ssc.textFileStream(ss3://textFileDirectory/) val dayBefore = sqlContext.jsonFile(ss3://textFileDirectory/) dayBefore..registerTempTable(rideaccepted) currentStream.foreachRDD { rdd = val df = sqlContext.jsonRDD(rdd) df.insertInto(rideaccepted) } ssc.start()
Hive UDFs
I know the typical way to apply a hive UDF to a dataframe is basically something like: dataframe.selectExpr(reverse(testString) as reversedString) Is there a way to apply the hive UDF just to a single row and get a row back? Something like: dataframe.first.selectExpr(reverse(testString) as reversedString) Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-UDFs-tp23707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark-submit can not resolve spark-hive_2.10
spark-hive is excluded when using --packages, because it can be included in the spark-assembly by adding -Phive during mvn package or sbt assembly. Best, Burak On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren inv...@gmail.com wrote: I want to add spark-hive as a dependence to submit my job, but it seems that spark-submit can not resolve it. $ ./bin/spark-submit \ → --packages org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1 \ → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \ → --master spark://localhost:7077 \ Ivy Default Cache set to: /home/invkrh/.ivy2/cache The jars for the packages stored in: /home/invkrh/.ivy2/jars https://repository.jboss.org/nexus/content/repositories/releases/ added as a remote repository with the name: repo-1 :: loading settings :: url = jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-hive_2.10 added as a dependency org.postgresql#postgresql added as a dependency joda-time#joda-time added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 confs: [default] found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache found joda-time#joda-time;2.8.1 in central :: resolution report :: resolve 139ms :: artifacts dl 3ms :: modules in use: joda-time#joda-time;2.8.1 from central in [default] org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in [default] - | |modules|| artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| - | default | 2 | 0 | 0 | 0 || 2 | 0 | - :: retrieving :: org.apache.spark#spark-submit-parent confs: [default] 0 artifacts copied, 2 already retrieved (0kB/6ms) Exception in thread main java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveContext at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.hive.HiveContext at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 7 more Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 16:57:59 INFO Utils: Shutdown hook called Any help is appreciated. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re:
Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
sparkr-submit additional R files
Hi all, *spark-submit* for Python and Java/Scala has *--py-files* and *--jars* options for submitting additional files on top of the main application. Is there any such option for *sparkr-submit*? I know that there is *includePackage() *R function to add library dependencies, but can you add other sources that are not R libraries (e.g. additional code repositories?). I really appreciate your help. Thanks, Michael
Re: User Defined Functions - Execution on Clusters
Interesting, thanks for the heads up. On 7/6/15, 7:19 PM, Davies Liu dav...@databricks.com wrote: Currently, Python UDFs run in a Python instances, are MUCH slower than Scala ones (from 10 to 100x). There is JIRA to improve the performance: https://issues.apache.org/jira/browse/SPARK-8632, After that, they will be still much slower than Scala ones (because Python is lower and the overhead for calling Python). On Mon, Jul 6, 2015 at 12:55 PM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Hi there, I’m trying to get a feel for how User Defined Functions from SparkSQL (as written in Python and registered using the udf function from pyspark.sql.functions) are run behind the scenes. Trying to grok the source it seems that the native Python function is serialized for distribution to the clusters. In practice, it seems to be able to check for other variables and functions defined elsewhere in the namepsace and include those in the function’s serialization. Following all this though, when actually run, are Python interpreter instances on each node brought up to actually run the function against the RDDs, or can the serialized function somehow be run on just the JVM? If bringing up Python instances is the execution model, what is the overhead of PySpark UDFs like as compared to those registered in Scala? Thanks, Alek CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Question about master memory requirement and GraphX pagerank performance !
Hi all, I am fairly new to spark and wonder if you can help me. I am exploring GraphX/Spark by running the pagerank example on a medium size graph (12 GB) using this command: My cluster is 1+16 machines, the master has 15 GB memory and each worker has 30 GB. The master has 2 cores and each worker has 4 cores. /home/ubuntu/spark-1.3.0/bin/spark-submit --master spark://Master IP:7077 --class org.apache.spark.examples.graphx.Analytics /home/ubuntu/spark-1.3.0/examples/target/scala-2.10/spark-examples-1.3.0-hadoop1.0.4.jar pagerank /user/ubuntu/input/dataset --numEPart=64 --output=/user/ubuntu/spark/16_pagerank --numIter=30 I have two questions: 1- When I set SPARK_EXECUTOR_MEMORY=25000M, I received errors because master cannot allocate this memory since the launched task includes -Xms 25000M. Based on my understanding, the master does not do any computation and this executor memory is only required in the worker machines. Why the application cannot start without allocating all required memory in the master as well as in all workers. ! 2- I changed the executor memory to 15 GB and the application worked fine. However, it did not finish the thirty iterations after 7 hours. There is one that was taking 4+ hours, and its input is 400+ GB. I must be doing something wrong, any comment? -- Thanks, -Khaled Ammar www.khaledammar.com
RE:
spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL re “A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.” I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” messages from specific sliding window and adding them to the Batch RDD This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency Can you also elaborate why you consider the dstream.window approach more “reliable” From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 12:56 PM To: Anand Nalya Cc: spark users Subject: Re: Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
Re: How to solve ThreadException in Apache Spark standalone Java Application
Can you try adding sc.stop at the end of your program? looks like its having a hard-time closing off sparkcontext. Thanks Best Regards On Tue, Jul 7, 2015 at 4:08 PM, Hafsa Asif hafsa.a...@matchinguu.com wrote: Hi, I run the following simple Java spark standalone app with maven command exec:java -Dexec.mainClass=SimpleApp public class SimpleApp { public static void main(String[] args) { System.out.println(Reading and Connecting with Spark.); try { String logFile = /home/asif/spark-1.4.0/README.md; // Should be some file on your system SparkConf conf = new SparkConf().setAppName(Simple Application).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); long numBs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(b); } }).count(); System.out.println(Lines with a: + numAs + , lines with b: + numBs); } catch(Exception e){ System.out.println (Error in connecting with Spark); } } } Well, it builds successfully and also giving results but with thread exception. What is the reason of the thread exception and how to solve it in standalone mode because in spark shell with spark commit command, it is running fine. Log trace is: [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building standAloneSparkApp 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ standAloneSparkApp --- Reading and Connecting with Spark. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 03:28:34 INFO SparkContext: Running Spark version 1.4.0 15/07/07 03:28:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/07 03:28:34 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.116.133 instead (on interface eth0) 15/07/07 03:28:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/07/07 03:28:34 INFO SecurityManager: Changing view acls to: asif 15/07/07 03:28:34 INFO SecurityManager: Changing modify acls to: asif 15/07/07 03:28:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(asif); users with modify permissions: Set(asif) 15/07/07 03:28:35 INFO Slf4jLogger: Slf4jLogger started 15/07/07 03:28:36 INFO Remoting: Starting remoting 15/07/07 03:28:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.116.133:34863] 15/07/07 03:28:36 INFO Utils: Successfully started service 'sparkDriver' on port 34863. 15/07/07 03:28:36 INFO SparkEnv: Registering MapOutputTracker 15/07/07 03:28:36 INFO SparkEnv: Registering BlockManagerMaster 15/07/07 03:28:36 INFO DiskBlockManager: Created local directory at /tmp/spark-b8a40d9c-d470-404e-bcd7-5763791ffeca/blockmgr-422d1fcb-bda4-4a2a-93a6-4d49c28cdf28 15/07/07 03:28:36 INFO MemoryStore: MemoryStore started with capacity 534.5 MB 15/07/07 03:28:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-b8a40d9c-d470-404e-bcd7-5763791ffeca/httpd-6f024c0f-60c4-413b-bb29-eee93e697651 15/07/07 03:28:36 INFO HttpServer: Starting HTTP Server 15/07/07 03:28:36 INFO Utils: Successfully started service 'HTTP file server' on port 46189. 15/07/07 03:28:37 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/07 03:28:37 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/07 03:28:37 INFO SparkUI: Started SparkUI at http://192.168.116.133:4040 15/07/07 03:28:37 INFO Executor: Starting executor ID driver on host localhost 15/07/07 03:28:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36884. 15/07/07 03:28:38 INFO NettyBlockTransferService: Server created on 36884 15/07/07 03:28:38 INFO BlockManagerMaster: Trying to register BlockManager 15/07/07 03:28:38 INFO BlockManagerMasterEndpoint: Registering block manager localhost:36884 with 534.5 MB RAM, BlockManagerId(driver, localhost, 36884) 15/07/07 03:28:38 INFO BlockManagerMaster: Registered BlockManager 15/07/07 03:28:39 INFO MemoryStore: ensureFreeSpace(110248) called with curMem=0, maxMem=560497950 15/07/07 03:28:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 534.4 MB) 15/07/07 03:28:40 INFO MemoryStore: ensureFreeSpace(10090)
Re: How to implement top() and filter() on object List for JavaRDD
Here's a simplified example: SparkConf conf = new SparkConf().setAppName( Sigmoid).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); ListString user = new ArrayListString(); user.add(Jack); user.add(Jill); user.add(Jack); user.add(Bob); JavaRDDString userRDD = sc.parallelize(user); //Now Lets filter all Jacks! JavaRDDString jackRDD = userRDD *.filter(new FunctionString, Boolean() {* *public Boolean call(String v1) throws Exception {* *return v1.equals(Jack);* *}* *}*); //Lets print all jacks! for (String s : jackRDD.collect()) { System.out.println(s); } Thanks Best Regards On Tue, Jul 7, 2015 at 5:39 PM, Hafsa Asif hafsa.a...@matchinguu.com wrote: I have also tried this stupid code snippet, only thinking that it may even compile code Function1User, Object FILTER_USER = new AbstractFunction1User, Object () { public Object apply(User user){ return user; } }; FILTER_USER is fine but cannot be applied to the following two options but no results: User[] filterUsr = (User[])rdd.rdd().retag(User.class).filter(FILTER_USER); User userFilter = (User) rdd.rdd().filter(FILTER_USER); Giving issue: Inconertable types I really need proper code related to this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to implement top() and filter() on object List for JavaRDD
I have also tried this stupid code snippet, only thinking that it may even compile code Function1User, Object FILTER_USER = new AbstractFunction1User, Object () { public Object apply(User user){ return user; } }; FILTER_USER is fine but cannot be applied to the following two options but no results: User[] filterUsr = (User[])rdd.rdd().retag(User.class).filter(FILTER_USER); User userFilter = (User) rdd.rdd().filter(FILTER_USER); Giving issue: Inconertable types I really need proper code related to this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Please add the Cincinnati spark meetup to the list of meet ups
http://www.meetup.com/Cincinnati-Apache-Spark-Meetup/ Thanks. Darin.
Re:
Evo, I'd let the OP clarify the question. I'm not in position of clarifying his requirements beyond what's written on the question. Regarding window vs mutable union: window is a well-supported feature that accumulates messages over time. The mutable unioning of RDDs is bound to operational trouble as there're no warranties tied to data preservation and it's unclear how one can produce 'cuts' of that union ready to be served for some process/computation. Intuitively, it will 'explode' at some point. -kr, Gerard. On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL re “A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.” I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” messages from specific sliding window and adding them to the Batch RDD This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency Can you also elaborate why you consider the dstream.window approach more “reliable” *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Tuesday, July 7, 2015 12:56 PM *To:* Anand Nalya *Cc:* spark users *Subject:* Re: Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
Re: How to implement top() and filter() on object List for JavaRDD
I would suggest you take alook to DataFrames. Also, I do not think you should implement comparators for user class as a whole, rather you should get the attribute to sort/compar on and delete sorting to data type of inherent attribute. Eg. sorting can be done by name and if so, it should be string sorting. In that case, you can create another KeyValue RDD whr key is the chosen attribute and value is user object and do a sortByKey. On Tue, Jul 7, 2015 at 10:09 PM, Hafsa Asif hafsa.a...@matchinguu.com wrote: I have also tried this stupid code snippet, only thinking that it may even compile code Function1User, Object FILTER_USER = new AbstractFunction1User, Object () { public Object apply(User user){ return user; } }; FILTER_USER is fine but cannot be applied to the following two options but no results: User[] filterUsr = (User[])rdd.rdd().retag(User.class).filter(FILTER_USER); User userFilter = (User) rdd.rdd().filter(FILTER_USER); Giving issue: Inconertable types I really need proper code related to this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards, Ayan Guha
Re: Master doesn't start, no logs
Yes, I do set $SPARK_MASTER_IP. I suspect a more internal issue, maybe due to multiple spark/hdfs instances having successively run on the same machine? -- Henri Maxime Demoulin 2015-07-07 4:10 GMT-04:00 Akhil Das ak...@sigmoidanalytics.com: Strange. What are you having in $SPARK_MASTER_IP? It may happen that it is not able to bind to the given ip but again it should be in the logs. Thanks Best Regards On Tue, Jul 7, 2015 at 12:54 AM, maxdml maxdemou...@gmail.com wrote: Hi, I've been compiling spark 1.4.0 with SBT, from the source tarball available on the official website. I cannot run spark's master, even tho I have built and run several other instance of spark on the same machine (spark 1.3, master branch, pre built 1.4, ...) /starting org.apache.spark.deploy.master.Master, logging to /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out failed to launch org.apache.spark.deploy.master.Master: full log in /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out/ But the log file is empty. After digging up to ./bin/spark-class, and finally trying to start the master with: ./bin/spark-class org.apache.spark.deploy.master.Master --host 155.99.144.31 I still have the same result. Here is the strace output for this command: http://pastebin.com/bkJVncBm I'm using a 64 bit Xeon, CentOS 6.5, spark 1.4.0, compiled against hadoop 2.5.2 Any idea? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Master-doesn-t-start-no-logs-tp23651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RECEIVED SIGNAL 15: SIGTERM
I am still receiving these weird sigterms on the executors. The driver claims it lost the executor, the executor receives a SIGTERM (from whom???) It doesn't seem a memory related issue though increasing memory takes the job a bit further or completes it. But why? there is no memory pressure on neither driver nor executor. And nothing in the logs indicating so. driver: 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Starting task 14762.0 in stage 0.0 (TID 14762, cruncher03.stratified, PROCESS_LOCAL, 13069 bytes) 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Finished task 14517.0 in stage 0.0 (TID 14517) in 15950 ms on cruncher03.stratified (14507/42240) 15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. cruncher05.stratified:32976 15/07/07 10:47:04 ERROR cluster.YarnClusterScheduler: Lost executor 1 on cruncher05.stratified: remote Rpc client disassociated 15/07/07 10:47:04 INFO scheduler.TaskSetManager: Re-queueing tasks for 1 from TaskSet 0.0 15/07/07 10:47:04 INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down. cruncher05.stratified:32976 15/07/07 10:47:04 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@cruncher05.stratified:32976] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 15/07/07 10:47:04 WARN scheduler.TaskSetManager: Lost task 14591.0 in stage 0.0 (TID 14591, cruncher05.stratified): ExecutorLostFailure (executor 1 lost) gc log for driver, it doesnt look like it run outofmem: 2015-07-07T10:45:19.887+0100: [GC (Allocation Failure) 1764131K-1391211K(3393024K), 0.0102839 secs] 2015-07-07T10:46:00.934+0100: [GC (Allocation Failure) 1764971K-1391867K(3405312K), 0.0099062 secs] 2015-07-07T10:46:45.252+0100: [GC (Allocation Failure) 1782011K-1392596K(3401216K), 0.0167572 secs] executor: 15/07/07 10:47:03 INFO executor.Executor: Running task 14750.0 in stage 0.0 (TID 14750) 15/07/07 10:47:03 INFO spark.CacheManager: Partition rdd_493_14750 not found, computing it 15/07/07 10:47:03 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 15/07/07 10:47:03 INFO storage.DiskBlockManager: Shutdown hook called executor gc log (no outofmem as it seems): 2015-07-07T10:47:02.332+0100: [GC (GCLocker Initiated GC) 24696750K-23712939K(33523712K), 0.0416640 secs] 2015-07-07T10:47:02.598+0100: [GC (GCLocker Initiated GC) 24700520K-23722043K(33523712K), 0.0391156 secs] 2015-07-07T10:47:02.862+0100: [GC (Allocation Failure) 24709182K-23726510K(33518592K), 0.0390784 secs] -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RECEIVED-SIGNAL-15-SIGTERM-tp23668.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to implement top() and filter() on object List for JavaRDD
Hi, I have an object list of Users and I want to implement top() and filter() methods on the object list. Let me explain you the whole scenario: 1. I have User object list named usersList. I fill it during record set. User user = new User(); user.setUserName(record.getValue(username).toString()); user.setPassword(record.getValue(password).toString()); usersList.add(user); 2. I successfully implement first() and take() methods like this, they are giving results: JavaRDDUser rdd = context.parallelize(usersList); /*Getting First User*/ User firstUsr = (User) rdd.rdd().first(); System.out.println(First User: +firstUsr.getUserName()); /*Getting Take 2 Users*/ User[] takeUsr = (User[])rdd.rdd().retag(User.class).take(2); for(int ctusr=0 ; ctusrtakeUsr.length;ctusr++ ){ System.out.println(quot;User from take list: quot;+takeUsr[ctusr].getUserName()); } 3. I want to implement the top() in the same way. I tried it but it requires two arguements(num, Orderinglt;User ord). I donot understand how to implement Ordering. I tried two possibilities: (i) User[] topUsr = (User[])rdd.rdd().retag(User.class).top(1, null); (ii) User[] topUsr = (User[])rdd.rdd().retag(User.class).top(1, new OrderingUser() { @Override public Some tryCompare(User user, User t1) { return null; } @Override public int compare(User usr1, User usr2) { usr1 = new User(); usr2 = new User(); for(int u=0 ; u usersList.size(); u++){ usr1 = (User)usersList.get(u); usr2 = (User)usersList.get(u); } String userName1 = usr1.getUserName(); String userName2 = usr2.getUserName(); System.out.println(userName1: +userName1); //ascending order return userName1.compareTo(userName2); //descending order //return userName2.compareTo(userName1); } @Override public boolean lteq(User user, User t1) { return false; } @Override public boolean gteq(User user, User t1) { return false; } @Override public boolean lt(User user, User t1) { return false; } @Override public boolean gt(User user, User t1) { return false; } @Override public boolean equiv(User user, User t1) { return false; } @Override public User max(User user, User t1) { return null; } @Override public User min(User user, User t1) { return null; } @Override public OrderingUser reverse() { return null; } @Override public U OrderingU on(Function1U, User function1) { return null; } @Override public Ops mkOrderingOps(User user) { return null; } } ); BOTH ARE GIVING ME NULLPOINTER EXCEPTION. Kindly guide me how to properly implement it? 4. I also want to implement the filter() in same way. User[] filterUsr = (User[])rdd.rdd().retag(User.class).filter(Function1User, Object); but could not write Function1 for it. I suppose that filter out the user whose name is Bob. How it can be possible by using object list and User objects? Can you give me code sample for Function1? I really appreciate your help and guidance. Thanks, Hafsa -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SPARK-SQL] libgplcompression.so already loaded in another classloader
Hi, all I found an Exception when using spark-sql java.lang.UnsatisfiedLinkError: Native Library /data/lib/native/libgplcompression.so already loaded in another classloader ... I set spark.sql.hive.metastore.jars=. in file spark-defaults.conf It does not happen every time. Who knows why? Spark version: 1.4.0 Hadoop version: 2.2.0
How to solve ThreadException in Apache Spark standalone Java Application
Hi, I run the following simple Java spark standalone app with maven command exec:java -Dexec.mainClass=SimpleApp public class SimpleApp { public static void main(String[] args) { System.out.println(Reading and Connecting with Spark.); try { String logFile = /home/asif/spark-1.4.0/README.md; // Should be some file on your system SparkConf conf = new SparkConf().setAppName(Simple Application).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); long numBs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(b); } }).count(); System.out.println(Lines with a: + numAs + , lines with b: + numBs); } catch(Exception e){ System.out.println (Error in connecting with Spark); } } } Well, it builds successfully and also giving results but with thread exception. What is the reason of the thread exception and how to solve it in standalone mode because in spark shell with spark commit command, it is running fine. Log trace is: [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building standAloneSparkApp 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ standAloneSparkApp --- Reading and Connecting with Spark. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 03:28:34 INFO SparkContext: Running Spark version 1.4.0 15/07/07 03:28:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/07 03:28:34 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.116.133 instead (on interface eth0) 15/07/07 03:28:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/07/07 03:28:34 INFO SecurityManager: Changing view acls to: asif 15/07/07 03:28:34 INFO SecurityManager: Changing modify acls to: asif 15/07/07 03:28:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(asif); users with modify permissions: Set(asif) 15/07/07 03:28:35 INFO Slf4jLogger: Slf4jLogger started 15/07/07 03:28:36 INFO Remoting: Starting remoting 15/07/07 03:28:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.116.133:34863] 15/07/07 03:28:36 INFO Utils: Successfully started service 'sparkDriver' on port 34863. 15/07/07 03:28:36 INFO SparkEnv: Registering MapOutputTracker 15/07/07 03:28:36 INFO SparkEnv: Registering BlockManagerMaster 15/07/07 03:28:36 INFO DiskBlockManager: Created local directory at /tmp/spark-b8a40d9c-d470-404e-bcd7-5763791ffeca/blockmgr-422d1fcb-bda4-4a2a-93a6-4d49c28cdf28 15/07/07 03:28:36 INFO MemoryStore: MemoryStore started with capacity 534.5 MB 15/07/07 03:28:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-b8a40d9c-d470-404e-bcd7-5763791ffeca/httpd-6f024c0f-60c4-413b-bb29-eee93e697651 15/07/07 03:28:36 INFO HttpServer: Starting HTTP Server 15/07/07 03:28:36 INFO Utils: Successfully started service 'HTTP file server' on port 46189. 15/07/07 03:28:37 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/07 03:28:37 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/07 03:28:37 INFO SparkUI: Started SparkUI at http://192.168.116.133:4040 15/07/07 03:28:37 INFO Executor: Starting executor ID driver on host localhost 15/07/07 03:28:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36884. 15/07/07 03:28:38 INFO NettyBlockTransferService: Server created on 36884 15/07/07 03:28:38 INFO BlockManagerMaster: Trying to register BlockManager 15/07/07 03:28:38 INFO BlockManagerMasterEndpoint: Registering block manager localhost:36884 with 534.5 MB RAM, BlockManagerId(driver, localhost, 36884) 15/07/07 03:28:38 INFO BlockManagerMaster: Registered BlockManager 15/07/07 03:28:39 INFO MemoryStore: ensureFreeSpace(110248) called with curMem=0, maxMem=560497950 15/07/07 03:28:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 534.4 MB) 15/07/07 03:28:40 INFO MemoryStore: ensureFreeSpace(10090) called with curMem=110248, maxMem=560497950 15/07/07 03:28:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.9 KB, free 534.4 MB) 15/07/07 03:28:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in
Re: Spark Standalone Cluster - Slave not connecting to Master
Hi MorEru, same problem occurred to. i had to change the version of maven dependency from spark_core_2.11 to spark_core_2.10 and it worked. Thanks Himanshu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Standalone-Cluster-Slave-not-connecting-to-Master-tp23572p23672.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[no subject]
Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that? Regards, Anand
How to solve ThreadException in Apache Spark standalone Java Application
Hi, I run the following simple Java spark standalone app with maven command exec:java -Dexec.mainClass=SimpleApp public class SimpleApp { public static void main(String[] args) { System.out.println(Reading and Connecting with Spark.); try { String logFile = /home/asif/spark-1.4.0/README.md; // Should be some file on your system SparkConf conf = new SparkConf().setAppName(Simple Application).setMaster(local); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString logData = sc.textFile(logFile).cache(); long numAs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(a); } }).count(); long numBs = logData.filter(new FunctionString, Boolean() { public Boolean call(String s) { return s.contains(b); } }).count(); System.out.println(Lines with a: + numAs + , lines with b: + numBs); } catch(Exception e){ System.out.println (Error in connecting with Spark); } } } Well, it builds successfully and also giving results but with thread exception. What is the reason of the thread exception and how to solve it in standalone mode because in spark shell with spark commit command, it is running fine. Log trace is: [INFO] Scanning for projects... [INFO] [INFO] [INFO] Building standAloneSparkApp 1.0-SNAPSHOT [INFO] [INFO] [INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ standAloneSparkApp --- Reading and Connecting with Spark. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/07 03:28:34 INFO SparkContext: Running Spark version 1.4.0 15/07/07 03:28:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/07/07 03:28:34 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.***.***.*** instead (on interface eth0) 15/07/07 03:28:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 15/07/07 03:28:34 INFO SecurityManager: Changing view acls to: myusername 15/07/07 03:28:34 INFO SecurityManager: Changing modify acls to: myusername 15/07/07 03:28:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(myusername); users with modify permissions: Set(myusername) 15/07/07 03:28:35 INFO Slf4jLogger: Slf4jLogger started 15/07/07 03:28:36 INFO Remoting: Starting remoting 15/07/07 03:28:36 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.***.***.***:*] 15/07/07 03:28:36 INFO Utils: Successfully started service 'sparkDriver' on port 34863. 15/07/07 03:28:36 INFO SparkEnv: Registering MapOutputTracker 15/07/07 03:28:36 INFO SparkEnv: Registering BlockManagerMaster 15/07/07 03:28:36 INFO MemoryStore: MemoryStore started with capacity 534.5 MB 15/07/07 03:28:36 INFO HttpFileServer: HTTP File server directory is /tmp/spark-b8a40d9c-d470-404e-bcd7-5763791ffeca/httpd-6f024c0f-60c4-413b-bb29-eee93e697651 15/07/07 03:28:36 INFO HttpServer: Starting HTTP Server 15/07/07 03:28:36 INFO Utils: Successfully started service 'HTTP file server' on port 46189. 15/07/07 03:28:37 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/07 03:28:37 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/07 03:28:37 INFO SparkUI: Started SparkUI at http://192.***.***.***: 15/07/07 03:28:37 INFO Executor: Starting executor ID driver on host localhost 15/07/07 03:28:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port *. 15/07/07 03:28:38 INFO NettyBlockTransferService: Server created on * 15/07/07 03:28:38 INFO BlockManagerMaster: Trying to register BlockManager 15/07/07 03:28:38 INFO BlockManagerMasterEndpoint: Registering block manager localhost:*with 534.5 MB RAM, BlockManagerId(driver, localhost, *) 15/07/07 03:28:38 INFO BlockManagerMaster: Registered BlockManager 15/07/07 03:28:39 INFO MemoryStore: ensureFreeSpace(***) called with curMem=0, maxMem=560497950 15/07/07 03:28:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 107.7 KB, free 534.4 MB) 15/07/07 03:28:40 INFO MemoryStore: ensureFreeSpace(10090) called with curMem=110248, maxMem=560497950 15/07/07 03:28:40 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 9.9 KB, free 534.4 MB) 15/07/07 03:28:40 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36884 (size: 9.9 KB, free: 534.5 MB) 15/07/07 03:28:40 INFO SparkContext: Created broadcast 0 from textFile at SimpleApp.java:19 15/07/07 03:28:40 INFO FileInputFormat: Total input paths to
Re: How to implement top() and filter() on object List for JavaRDD
Thank u for your quick response. But, I tried this and get the error as shown in pic error.jpg http://apache-spark-user-list.1001560.n3.nabble.com/file/n23676/error.jpg -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-top-and-filter-on-object-List-for-JavaRDD-tp23669p23676.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: The auxService:spark_shuffle does not exist
Did you enable the dynamic resource allocation ? You can refer to this page for how to configure spark shuffle service for yarn. https://spark.apache.org/docs/1.4.0/job-scheduling.html On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote: we tried --master yarn-client with no different result. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
RE: Hive UDFs
dataframe.limit(1).selectExpr(xxx).collect()? -Original Message- From: chrish2312 [mailto:c...@palantir.com] Sent: Wednesday, July 8, 2015 6:20 AM To: user@spark.apache.org Subject: Hive UDFs I know the typical way to apply a hive UDF to a dataframe is basically something like: dataframe.selectExpr(reverse(testString) as reversedString) Is there a way to apply the hive UDF just to a single row and get a row back? Something like: dataframe.first.selectExpr(reverse(testString) as reversedString) Thanks in advance! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-UDFs-tp23707.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL OOM issue
Hi Shawn, Thank alot that's actually the last parameter we overlooked!! I'm able to run the same sql on spark now if I set the spark.driver.memoory larger, thanks again!! -- Best Regards, Felicia Shann 單師涵 +886-3-5636688 Ext. 7124300 |- |Xiaoyu Ma| |hzmaxiaoyu@corp.netease.| |com | | | | | | | |2015/07/07 下午 04:03| |- --| | | | | | To| |shsh...@tsmc.com | | cc| |user@spark.apache.org, mike_s...@tsmc.com, linc...@tsmc.com | | Subject| |Re: SparkSQL OOM issue | | | | | | | | | | | --| Hi, Where did OOM happened? In Driver or executor? Sometimes SparkSQL Driver OOM on tables with large number partitions. If so, you might want to increase it in spark-defaults.conf spark.driver.memory Shawn On Jul 7, 2015, at 3:58 PM, shsh...@tsmc.com wrote: Dear all, We've tried to use sparkSql to do some insert from A table to B table action where using the exact same SQL script, hive is able to finish it but Spark 1.3.1 would always end with OOM issue; we tried several configuration including: --executor-cores 2 --num-executors 300 --executor-memory 7g sconf.set(spark.storage.memoryFraction, 0) but none of them can change the result of error: java.lang.OutOfMemoryError: GC overhead limit exceeded is there any other configuration we can make? Thanks! --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly prohibited. If you are not the intended recipient, please notify the sender by replying to this email, and then delete this email and any copies of it immediately. Thank you. --- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly
How to submit streaming application and exit
I'm writing a streaming application and want to use spark-submit to submit it to a YARN cluster. I'd like to submit it in a client node and exit spark-submit after the application is running. Is it possible?
RE: Hibench build fail
Hi Hui, Could you please add more descriptions (about the failure) in HiBench github Issues? HiBench works with spark 1.2 and above. Thank you Best Regards, Grace (Huang Jie) From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Wednesday, July 8, 2015 12:50 AM To: 罗辉 Cc: user; Huang, Jie Subject: Re: Hibench build fail bq. Need I specify my spark version Looks like the build used 1.4.0 SNAPSHOT. Please use 1.4.0 release. Cheers On Mon, Jul 6, 2015 at 11:50 PM, luohui20...@sina.commailto:luohui20...@sina.com wrote: Hi grace, recently I am trying Hibench to evaluate my spark cluster, however I got a problem in building Hibench, would you help to take a look? thanks. It fails at building Sparkbench, and you may check the attched pic for more info. My spark version :1.3.1,hadoop version :2.7.0 and HiBench version:4.0, python 2.6.6. It is reported that failed for spark1.4 and MR1,which I didn't install in my cluster.Need I specify my spark version and hadoop version when I am running bin/build-all.sh? thanks. Thanksamp;Best regards! San.Luo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: spark - redshift !!!
Hi, I have done a lot of EMR-S3-Redshift using Redshift COPY, haven't done any from Spark yet but I plan on doing it soon and have been doing some research. Take a look at this article - Best Practices for Micro-Batch Loading on Amazon Redshift https://blogs.aws.amazon.com/bigdata/post/Tx2ANLN1PGELDJU/Best-Practices-for-Micro-Batch-Loading-on-Amazon-Redshift Thanks Pete On Tue, Jul 7, 2015 at 6:57 PM, spark user spark_u...@yahoo.com.invalid wrote: Hi Can you help me how to load data from s3 bucket to redshift , if you gave sample code can you pls send me Thanks su
[no subject]
Hi, everyone! I've got key,value pair in form of LongWritable, Text, where I used the following code: SparkConf conf = new SparkConf().setAppName(MapReduceFileInput); JavaSparkContext sc = new JavaSparkContext(conf); Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text sourceFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat.class,LongWritable.class,Text.class,confHadoop); Now I want to handle the javapairrdd data from LongWritable, Text to another LongWritable, Text, where the Text content is different. After that, I want to write Text into hdfs in order of LongWritable value. But I don't know how to write mapreduce function in spark using java language. Someone can help me? Sincerely, Missie.
How to write mapreduce programming in spark by using java on user-defined javaPairRDD?
Hi, everyone! I've got key,value pair in form of LongWritable, Text, where I used the following code: SparkConf conf = new SparkConf().setAppName(MapReduceFileInput); JavaSparkContext sc = new JavaSparkContext(conf); Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text sourceFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat.class,LongWritable.class,Text.class,confHadoop); Now I want to handle the javapairrdd data from LongWritable, Text to another LongWritable, Text, where the Text content is different. After that, I want to write Text into hdfs in order of LongWritable value. But I don't know how to write mapreduce function in spark using java language. Someone can help me? Sincerely, Missie.
Re:
Please take a look at core/src/test/java/org/apache/spark/JavaAPISuite.java in source code. Cheers On Tue, Jul 7, 2015 at 7:17 AM, 付雅丹 yadanfu1...@gmail.com wrote: Hi, everyone! I've got key,value pair in form of LongWritable, Text, where I used the following code: SparkConf conf = new SparkConf().setAppName(MapReduceFileInput); JavaSparkContext sc = new JavaSparkContext(conf); Configuration confHadoop = new Configuration(); JavaPairRDDLongWritable,Text sourceFile=sc.newAPIHadoopFile( hdfs://cMaster:9000/wcinput/data.txt, DataInputFormat.class,LongWritable.class,Text.class,confHadoop); Now I want to handle the javapairrdd data from LongWritable, Text to another LongWritable, Text, where the Text content is different. After that, I want to write Text into hdfs in order of LongWritable value. But I don't know how to write mapreduce function in spark using java language. Someone can help me? Sincerely, Missie.
Re: Job consistently failing after leftOuterJoin() - oddly sized / non-uniform partitions
Right, I figured I'd need a custom partitioner from what I've read around! Documentation on this is super sparse; do you have any recommended links on solving data skew and/or creating custom partitioners in Spark 1.4? I'd also love to hear if this is an unusual problem with my type of set-up - if the cluster should be able to handle this, if it were somehow configured differently. Thank you, Mo Sent from my iPhone On Jul 6, 2015, at 8:12 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partition by a parameter in join operator. However you have a data skew problem which you need to resolve using a reasonable partition by function On 7 Jul 2015 08:57, Mohammed Omer beancinemat...@gmail.com wrote: Afternoon all, Really loving this project and the community behind it. Thank you all for your hard work. This past week, though, I've been having a hard time getting my first deployed job to run without failing at the same point every time: Right after a leftOuterJoin, most partitions (600 total) are small (1-100MB), while some others are large (3-6GB). The large ones consistently spill 20-60GB into memory, and eventually fail. If I could only get the partitions to be smaller, right out of the leftOuterJoin, it seems like the job would run fine. I've tried trawling through the logs, but it hasn't been very fruitful in finding out what, specifically, is the issue. Cluster setup: * 6 worker nodes (16 cores, 104GB Memory, 500GB storage) * 1 master (same config as above) Running Spark on YARN, with: storage.memoryFraction = .3 --executors = 6 --executor-cores = 12 --executor-memory = kind of confusing due to YARN, but basically in the Spark monitor site's Executors page, it shows each as running with 18.8GB memory, though I know usage is much larger due to YARN managing various pieces. (Total memory available to yarn shows 480GB, with 270GB currently used). Screenshot of the task page: http://i.imgur.com/xG3KdEl.png Code: https://gist.github.com/momer/8bc03c60a639e5c04eda#file-spark-scala-L60 (see line 60 for the relevant area) Any pointers in the right direction, or advice on articles to read, or even debugging / settings advice or recommendations would be extremely helpful. I'll put a bounty on this of $50 donation to the ASF! :D Thank you all for reading (and hopefully replying!), Mo Omer
Re: How to solve ThreadException in Apache Spark standalone Java Application
I tried also sc.stop(). Sorry I didnot include that in my question, but still getting thread exception. It is also need to mention that I am working on VM Machine. 15/07/07 06:00:32 ERROR ActorSystemImpl: Uncaught error from thread [sparkDriver-akka.actor.default-dispatcher-5] java.lang.InterruptedException: Interrupted while processing system messages at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:265) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 15/07/07 06:00:32 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/07/07 06:00:32 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/07/07 06:00:32 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. [WARNING] thread Thread[ForkJoinPool-3-worker-3,5,SimpleApp] was interrupted but is still alive after waiting at least 12877msecs [WARNING] thread Thread[ForkJoinPool-3-worker-3,5,SimpleApp] will linger despite being asked to die via interruption [WARNING] NOTE: 1 thread(s) did not finish despite being asked to via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied. [INFO] Total time: 29.896s -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-solve-ThreadException-in-Apache-Spark-standalone-Java-Application-tp23675p23679.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE:
Requirements – then see my abstracted interpretation – what else do you need in terms of Requirements …: “Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:” A formal requirements spec derived from the above - I think the actual requirement here is picking up and adding Specific (filtered) Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” (on top of that all) messages from sliding window and adding them to the Batch RDD. Such requiremet should be defined as the Frequency of Updates to the Batch RDD and what these updates are e.g. specific filtered messages and then using dstream.window() can be made equal to that frequency Essentialy the update frequency can range from the filtered messages of Every Single DStream RDD to the filetered messages of a SLIDING WINDOW Secondly what do you call “mutable uniniong” That was his initial code var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } Here is how it looks when Persisting the result from evet union – supposed to produce NEW PERSTINET IMMUTABLE Batch RDD – why is that supposed to be less “stable/reliable” – what are the exact tectnical reasons for that var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)).cashe() } From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 1:55 PM To: Evo Eftimov Cc: Anand Nalya; spark users Subject: Re: Evo, I'd let the OP clarify the question. I'm not in position of clarifying his requirements beyond what's written on the question. Regarding window vs mutable union: window is a well-supported feature that accumulates messages over time. The mutable unioning of RDDs is bound to operational trouble as there're no warranties tied to data preservation and it's unclear how one can produce 'cuts' of that union ready to be served for some process/computation. Intuitively, it will 'explode' at some point. -kr, Gerard. On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov evo.efti...@isecc.com wrote: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL re “A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.” I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD to the Batch RDD rather than “preserving” messages from specific sliding window and adding them to the Batch RDD This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency Can you also elaborate why you consider the dstream.window approach more “reliable” From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Tuesday, July 7, 2015 12:56 PM To: Anand Nalya Cc: spark users Subject: Re: Anand, AFAIK, you will need to change two settings: spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data spark.cleaner.ttl = some reasonable value in seconds Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up. You will probably be in some tricky ground with this approach. A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform. Something like: dstream.window(Seconds(900), Seconds(900)).transform(rdd = rdd union otherRdd)... If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead. -kr, Gerard. On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya anand.na...@gmail.com wrote: Hi, Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this: var myRDD: RDD[(String, Long)] = sc.fromText... dstream.foreachRDD{ rdd = myRDD = myRDD.union(rdd.filter(myfilter)) } My questions is that for how long spark will keep RDDs underlying the dstream around? Is
Re: Spark standalone cluster - Output file stored in temporary directory in worker
core-site.xml configuration property namefs.default.name/name valuehdfs://localhost:9000/value /property /configuration hdfs_site.xml - configuration property namedfs.replication/name value1/value /property property namedfs.namenode.name.dir/name valuefile:/usr/local/hadoop_store/hdfs/namenode/value /property property namedfs.datanode.data.dir/name valuefile:/usr/local/hadoop_store/hdfs/datanode/value /property /configuration I have not made any changes to the default hadoo-env.sh apart from manually adding the JAVA_HOME entry. What should the properties be configured to ? To the master HDFS where the file is actually present ? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-Output-file-stored-in-temporary-directory-in-worker-tp23653p23683.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: is it possible to disable -XX:OnOutOfMemoryError=kill %p for the executors?
it seems it is hardcoded in ExecutorRunnable.scala : val commands = prefixEnv ++ Seq( YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + /bin/java, -server, // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the executor and (to some extent) the jvm in // an inconsistent state. // TODO: If the OOM is not recoverable by rescheduling it on different node, then do // 'something' to fail job ... akin to blacklisting trackers in mapred ? -XX:OnOutOfMemoryError='kill %p') ++ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-it-possible-to-disable-XX-OnOutOfMemoryError-kill-p-for-the-executors-tp23680p23681.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Master doesn't start, no logs
Can you try renaming the ~/.ivy2 file to ~/.ivy2_backup and build spark1.4.0 again and run it? Thanks Best Regards On Tue, Jul 7, 2015 at 6:27 PM, Max Demoulin maxdemou...@gmail.com wrote: Yes, I do set $SPARK_MASTER_IP. I suspect a more internal issue, maybe due to multiple spark/hdfs instances having successively run on the same machine? -- Henri Maxime Demoulin 2015-07-07 4:10 GMT-04:00 Akhil Das ak...@sigmoidanalytics.com: Strange. What are you having in $SPARK_MASTER_IP? It may happen that it is not able to bind to the given ip but again it should be in the logs. Thanks Best Regards On Tue, Jul 7, 2015 at 12:54 AM, maxdml maxdemou...@gmail.com wrote: Hi, I've been compiling spark 1.4.0 with SBT, from the source tarball available on the official website. I cannot run spark's master, even tho I have built and run several other instance of spark on the same machine (spark 1.3, master branch, pre built 1.4, ...) /starting org.apache.spark.deploy.master.Master, logging to /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out failed to launch org.apache.spark.deploy.master.Master: full log in /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out/ But the log file is empty. After digging up to ./bin/spark-class, and finally trying to start the master with: ./bin/spark-class org.apache.spark.deploy.master.Master --host 155.99.144.31 I still have the same result. Here is the strace output for this command: http://pastebin.com/bkJVncBm I'm using a 64 bit Xeon, CentOS 6.5, spark 1.4.0, compiled against hadoop 2.5.2 Any idea? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Master-doesn-t-start-no-logs-tp23651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
is it possible to disable -XX:OnOutOfMemoryError=kill %p for the executors?
I get a suspicious sigterm on the executors that doesnt seem to be from the driver. The other thing that might send a sigterm is the -XX:OnOutOfMemoryError=kill %p java arg that the executor starts with. Now my tasks dont seem to run out of mem, so how can I disable this param to debug them? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/is-it-possible-to-disable-XX-OnOutOfMemoryError-kill-p-for-the-executors-tp23680.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Standalone Cluster - Slave not connecting to Master
Hi Himanshu, I am using spark_core_2.10 in my maven dependency. There were no issues with that. The problem I had with this was that the spark master was running on localhost inside the vm and the slave was not able to connect it. I changed the spark master to run on the private IP address within the vm and updated port forwarding tables in the vm to forward all requests to the private address and I got it to work. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Standalone-Cluster-Slave-not-connecting-to-Master-tp23572p23682.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to verify that the worker is connected to master in CDH5.4
Hi, I have CDH 5.4 installed on a linux server. It has 1 cluster in which spark is deployed as a history server. I am trying to connect my laptop to the spark history server. When I run spark-shell master ip: port number I get the following output How can I verify that the worker is connected to the master? Thanks, Ashish
Re: How to submit streaming application and exit
spark-submit is nothing but a process in your OS, so you should be able to submit it in background and exit. However, your spark-submit process itself is the driver for your spark streaming application, so it will not exit for the lifetime of the streaming app. On Wed, Jul 8, 2015 at 1:13 PM, Bin Wang wbi...@gmail.com wrote: I'm writing a streaming application and want to use spark-submit to submit it to a YARN cluster. I'd like to submit it in a client node and exit spark-submit after the application is running. Is it possible? -- Best Regards, Ayan Guha
Re: How to verify that the worker is connected to master in CDH5.4
Hi Ashish, Are you running Spark-on-YARN on the cluster with an instance of Spark History server? Also if you are using Cloudera Manager and using Spark on YARN, spark on yarn service has a link for the history server web UI. Can you paste the command and the output you are seeing in the thread? Guru Medasani gdm...@gmail.com On Jul 7, 2015, at 10:42 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, I have CDH 5.4 installed on a linux server. It has 1 cluster in which spark is deployed as a history server. I am trying to connect my laptop to the spark history server. When I run spark-shell master ip: port number I get the following output How can I verify that the worker is connected to the master? Thanks, Ashish - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to verify that the worker is connected to master in CDH5.4
Thank you Ayan for your response.. But I have just realised that the Spark is configured to be a history server. Please, can somebody suggest to me how can I convert Spark history server to be a Master server? Thank you Sincerely, Ashish Dutt On Wed, Jul 8, 2015 at 12:28 PM, ayan guha guha.a...@gmail.com wrote: On UI? Master: http://masterip:8080 Worker: http://workerIp:8081 On Wed, Jul 8, 2015 at 1:42 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, I have CDH 5.4 installed on a linux server. It has 1 cluster in which spark is deployed as a history server. I am trying to connect my laptop to the spark history server. When I run spark-shell master ip: port number I get the following output How can I verify that the worker is connected to the master? Thanks, Ashish -- Best Regards, Ayan Guha
Re: How to verify that the worker is connected to master in CDH5.4
Hello Guru, Thank you for your quick response. This is what i get when I try executing spark-shell master ip:port number C:\spark-1.4.0\binspark-shell master IP:18088 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/08 11:28:35 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:35 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:35 INFO Utils: Successfully started service 'HTTP class server' on port 52767. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79) Type in expressions to have them evaluated. Type :help for more information. 15/07/08 11:28:39 INFO SparkContext: Running Spark version 1.4.0 15/07/08 11:28:39 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:40 INFO Slf4jLogger: Slf4jLogger started 15/07/08 11:28:40 INFO Remoting: Starting remoting 15/07/08 11:28:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.228.208.74:52780] 15/07/08 11:28:40 INFO Utils: Successfully started service 'sparkDriver' on port 52780. 15/07/08 11:28:40 INFO SparkEnv: Registering MapOutputTracker 15/07/08 11:28:40 INFO SparkEnv: Registering BlockManagerMaster 15/07/08 11:28:40 INFO DiskBlockManager: Created local directory at C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4aef -9063-cae29c488382\blockmgr-a967422b-05e8-4fc1-b60b-facc7dbd4414 15/07/08 11:28:40 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/07/08 11:28:40 INFO HttpFileServer: HTTP File server directory is C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4ae f-9063-cae29c488382\httpd-928f4485-ea08-4749-a478-59708db0fefa 15/07/08 11:28:40 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:40 INFO Utils: Successfully started service 'HTTP file server' on port 52781. 15/07/08 11:28:40 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/08 11:28:40 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/08 11:28:40 INFO SparkUI: Started SparkUI at http://10.228.208.74:4040 15/07/08 11:28:40 INFO Executor: Starting executor ID driver on host localhost 15/07/08 11:28:41 INFO Executor: Using REPL class URI: http://10.228.208.74:52767 15/07/08 11:28:41 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52800. 15/07/08 11:28:41 INFO NettyBlockTransferService: Server created on 52800 15/07/08 11:28:41 INFO BlockManagerMaster: Trying to register BlockManager 15/07/08 11:28:41 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52800 with 265.4 MB RAM, BlockManagerId(drive r, localhost, 52800) 15/07/08 11:28:41 INFO BlockManagerMaster: Registered BlockManager 15/07/08 11:28:41 INFO SparkILoop: Created spark context.. Spark context available as sc. 15/07/08 11:28:41 INFO HiveContext: Initializing execution hive, version 0.13.1 15/07/08 11:28:42 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/07/08 11:28:42 INFO ObjectStore: ObjectStore, initialize called 15/07/08 11:28:42 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/07/08 11:28:42 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/07/08 11:28:42 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/07/08 11:28:42 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 15/07/08 11:28:52 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescrip tor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/07/08 11:28:52 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Enc ountered: @ (64), after : . 15/07/08 11:28:53 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so do es not have its own datastore table. 15/07/08 11:28:53 INFO
Re: How to create empty RDD
It worked Zhou. On Mon, Jul 6, 2015 at 10:43 PM, Wei Zhou zhweisop...@gmail.com wrote: I userd val output: RDD[(DetailInputRecord, VISummary)] = sc.emptyRDD[(DetailInputRecord, VISummary)] to create empty RDD before. Give it a try, it might work for you too. 2015-07-06 14:11 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: I need to return an empty RDD of type val output: RDD[(DetailInputRecord, VISummary)] This does not work val output: RDD[(DetailInputRecord, VISummary)] = new RDD() as RDD is abstract class. How do i create empty RDD ? -- Deepak -- Deepak
Catalyst Errors when building spark from trunk
The following errors are occurring upon building using mvn options clean package Are there some requirements/restrictions on profiles/settings for catalyst to build properly? [error] /shared/sparkup2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala:138: value length is not a member of org.apache.spark.unsafe.types.UTF8String [error] buildCast[UTF8String](_, _.length() != 0) [error] ^ [error] /shared/sparkup2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala:282: value length is not a member of org.apache.spark.unsafe.types.UTF8String [error] val (st, end) = slicePos(start, length, () = s.length()) [error] ^ [error] /shared/sparkup2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala:283: type mismatch; [error] found : Any [error] required: Int [error] s.substring(st, end) [error] ^ [error] /shared/sparkup2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala:283: type mismatch; [error] found : Any [error] required: Int [error] s.substring(st, end) [error] ^ [error] /shared/sparkup2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringOperations.scala:304: value length is not a member of org.apache.spark.unsafe.types.UTF8String [error] if (string == null) null else string.asInstanceOf[UTF8String].length [error] ^ [warn] three warnings found [error] 5 errors found [error] Compile failed at Jul 7, 2015 9:43:44 PM [19.378s]
Re: How to verify that the worker is connected to master in CDH5.4
Hi Ashish, If you are not using Spark on YARN and instead using Spark Standalone, you don’t need Spark history server. More on the Web Interfaces is provided in the following link. Since are using standalone mode, you should be able to access the web UI for the master and workers at ports that Ayan provided in early email. Master: http://masterip:8080 Worker: http://workerIp:8081 https://spark.apache.org/docs/latest/monitoring.html https://spark.apache.org/docs/latest/monitoring.html If you are using Spark on YARN, spark history server is configured to run on port 18080 by default on the server where Spark history server is running. Guru Medasani gdm...@gmail.com On Jul 8, 2015, at 12:01 AM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Guru, Thank you for your quick response. This is what i get when I try executing spark-shell master ip:port number C:\spark-1.4.0\binspark-shell master IP:18088 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/08 11:28:35 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:35 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:35 INFO Utils: Successfully started service 'HTTP class server' on port 52767. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79) Type in expressions to have them evaluated. Type :help for more information. 15/07/08 11:28:39 INFO SparkContext: Running Spark version 1.4.0 15/07/08 11:28:39 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:40 INFO Slf4jLogger: Slf4jLogger started 15/07/08 11:28:40 INFO Remoting: Starting remoting 15/07/08 11:28:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.228.208.74:52780 http://sparkDriver@10.228.208.74:52780/] 15/07/08 11:28:40 INFO Utils: Successfully started service 'sparkDriver' on port 52780. 15/07/08 11:28:40 INFO SparkEnv: Registering MapOutputTracker 15/07/08 11:28:40 INFO SparkEnv: Registering BlockManagerMaster 15/07/08 11:28:40 INFO DiskBlockManager: Created local directory at C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4aef -9063-cae29c488382\blockmgr-a967422b-05e8-4fc1-b60b-facc7dbd4414 15/07/08 11:28:40 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/07/08 11:28:40 INFO HttpFileServer: HTTP File server directory is C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4ae f-9063-cae29c488382\httpd-928f4485-ea08-4749-a478-59708db0fefa 15/07/08 11:28:40 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:40 INFO Utils: Successfully started service 'HTTP file server' on port 52781. 15/07/08 11:28:40 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/08 11:28:40 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/08 11:28:40 INFO SparkUI: Started SparkUI at http://10.228.208.74:4040 http://10.228.208.74:4040/ 15/07/08 11:28:40 INFO Executor: Starting executor ID driver on host localhost 15/07/08 11:28:41 INFO Executor: Using REPL class URI: http://10.228.208.74:52767 http://10.228.208.74:52767/ 15/07/08 11:28:41 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52800. 15/07/08 11:28:41 INFO NettyBlockTransferService: Server created on 52800 15/07/08 11:28:41 INFO BlockManagerMaster: Trying to register BlockManager 15/07/08 11:28:41 INFO BlockManagerMasterEndpoint: Registering block manager localhost:52800 with 265.4 MB RAM, BlockManagerId(drive r, localhost, 52800) 15/07/08 11:28:41 INFO BlockManagerMaster: Registered BlockManager 15/07/08 11:28:41 INFO SparkILoop: Created spark context.. Spark context available as sc. 15/07/08 11:28:41 INFO HiveContext: Initializing execution hive, version 0.13.1 15/07/08 11:28:42 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/07/08 11:28:42
how to use DoubleRDDFunctions on mllib Vector?
hi, there are some useful functions in DoubleRDDFunctions, which I can use if I have RDD[Double], eg, mean, variance. Vector doesn't have such methods, how can I convert Vector to RDD[Double], or maybe better if I can call mean directly on a Vector?
Re: How to verify that the worker is connected to master in CDH5.4
On UI? Master: http://masterip:8080 Worker: http://workerIp:8081 On Wed, Jul 8, 2015 at 1:42 PM, Ashish Dutt ashish.du...@gmail.com wrote: Hi, I have CDH 5.4 installed on a linux server. It has 1 cluster in which spark is deployed as a history server. I am trying to connect my laptop to the spark history server. When I run spark-shell master ip: port number I get the following output How can I verify that the worker is connected to the master? Thanks, Ashish -- Best Regards, Ayan Guha
Re: How to verify that the worker is connected to master in CDH5.4
Hello Guru, Many thanks for your reply. I am new to this who thing. So pardon me for my naiivety at times. I am not sure if I am using Spark standalone or Spark on Yarn because when I check the port number of Spark it shows it as 18088 and like you have mentioned maybe it is then Spark on Yarn. All I want for now is how to connect my laptop to the spark cluster machine using either pyspark or SparkR. (I have python 2.7) On my laptop I am using winutils in place of hadoop and have spark 1.4 installed Thank you Sincerely, Ashish Dutt PhD Candidate Department of Information Systems University of Malaya, Lembah Pantai, 50603 Kuala Lumpur, Malaysia On Wed, Jul 8, 2015 at 1:13 PM, Guru Medasani gdm...@gmail.com wrote: Hi Ashish, If you are not using Spark on YARN and instead using Spark Standalone, you don’t need Spark history server. More on the Web Interfaces is provided in the following link. Since are using standalone mode, you should be able to access the web UI for the master and workers at ports that Ayan provided in early email. Master: http://masterip:8080 Worker: http://workerIp:8081 https://spark.apache.org/docs/latest/monitoring.html If you are using Spark on YARN, spark history server is configured to run on port 18080 by default on the server where Spark history server is running. Guru Medasani gdm...@gmail.com On Jul 8, 2015, at 12:01 AM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Guru, Thank you for your quick response. This is what i get when I try executing spark-shell master ip:port number C:\spark-1.4.0\binspark-shell master IP:18088 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/08 11:28:35 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:35 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:35 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:35 INFO Utils: Successfully started service 'HTTP class server' on port 52767. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79) Type in expressions to have them evaluated. Type :help for more information. 15/07/08 11:28:39 INFO SparkContext: Running Spark version 1.4.0 15/07/08 11:28:39 INFO SecurityManager: Changing view acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: Changing modify acls to: Ashish Dutt 15/07/08 11:28:39 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set (Ashish Dutt); users with modify permissions: Set(Ashish Dutt) 15/07/08 11:28:40 INFO Slf4jLogger: Slf4jLogger started 15/07/08 11:28:40 INFO Remoting: Starting remoting 15/07/08 11:28:40 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.228.208.74:52780] 15/07/08 11:28:40 INFO Utils: Successfully started service 'sparkDriver' on port 52780. 15/07/08 11:28:40 INFO SparkEnv: Registering MapOutputTracker 15/07/08 11:28:40 INFO SparkEnv: Registering BlockManagerMaster 15/07/08 11:28:40 INFO DiskBlockManager: Created local directory at C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4aef -9063-cae29c488382\blockmgr-a967422b-05e8-4fc1-b60b-facc7dbd4414 15/07/08 11:28:40 INFO MemoryStore: MemoryStore started with capacity 265.4 MB 15/07/08 11:28:40 INFO HttpFileServer: HTTP File server directory is C:\Users\Ashish Dutt\AppData\Local\Temp\spark-80c4f1fe-37de-4ae f-9063-cae29c488382\httpd-928f4485-ea08-4749-a478-59708db0fefa 15/07/08 11:28:40 INFO HttpServer: Starting HTTP Server 15/07/08 11:28:40 INFO Utils: Successfully started service 'HTTP file server' on port 52781. 15/07/08 11:28:40 INFO SparkEnv: Registering OutputCommitCoordinator 15/07/08 11:28:40 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/07/08 11:28:40 INFO SparkUI: Started SparkUI at http://10.228.208.74:4040 15/07/08 11:28:40 INFO Executor: Starting executor ID driver on host localhost 15/07/08 11:28:41 INFO Executor: Using REPL class URI: http://10.228.208.74:52767 15/07/08 11:28:41 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52800. 15/07/08 11:28:41 INFO NettyBlockTransferService: Server created on 52800 15/07/08 11:28:41 INFO BlockManagerMaster: Trying to register BlockManager 15/07/08 11:28:41 INFO
Re: How do we control output part files created by Spark job?
Hi Srikant thanks for the response. I have the following code: hiveContext.sql(insert into... ).coalesce(6) Above code does not create 6 part files it creates around 200 small files. Please guide. Thanks. On Jul 8, 2015 4:07 AM, Srikanth srikanth...@gmail.com wrote: Did you do yourRdd.coalesce(6).saveAsTextFile() or yourRdd.coalesce(6) yourRdd.saveAsTextFile() ? Srikanth On Tue, Jul 7, 2015 at 12:59 PM, Umesh Kacha umesh.ka...@gmail.com wrote: Hi I tried both approach using df. repartition(6) and df.coalesce(6) it doesn't reduce part-x files. Even after calling above method I still see around 200 small part files of size 20 mb each which is again orc files. On Tue, Jul 7, 2015 at 12:52 AM, Sathish Kumaran Vairavelu vsathishkuma...@gmail.com wrote: Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复:HiveContext throws org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Hi, bdev Derby is the default embedded DB for Hive MetaStore if you do not specify a hive.metastore.uris, please take a look at the lib directory of hive, you can find out derby jar there, Spark does not require derby by default At 2015-07-07 17:07:28, bdev buntu...@gmail.com wrote: Just trying to get started with Spark and attempting to use HiveContext using spark-shell to interact with existing Hive tables on my CDH cluster but keep running into the errors (pls see below) when I do 'hiveContext.sql(show tables)'. Wanted to know what all JARs need to be included to have this working. Thanks! java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:472) at org.apache.spark.sql.hive.HiveContext.sessionState$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.sessionState(HiveContext.scala:225) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:241) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:240) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:86) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:31) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:38) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:40) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:42) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:44) at $iwC$$iwC$$iwC$$iwC.init(console:46) at $iwC$$iwC$$iwC.init(console:48) at $iwC$$iwC.init(console:50) at $iwC.init(console:52) at init(console:54) at .init(console:58) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1488) at
Re:Maintain Persistent Connection with Hive meta store
Each time you run the jar, a new JVM will be started, maintain connection between different JVM is not a correct way to think of each time when I run that jar it tries to make connection with hive metastore At 2015-07-07 17:07:06, wazza rajeshkumarit8...@gmail.com wrote: Hi I am new to Apache Spark and I have tried to query hive tables using Apache Spark Sql. First I have tried it in Spark-shell where I can query 1 lakh records from hive table within a second. Then I have tried in a java code which always take more than 10 seconds and I have noted that each time when I run that jar it tries to make connection with hive metastore. can any one tell me how to maintain the connection between Apache spark and Hive metastore or else how to achieve that same in java. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Maintain-Persistent-Connection-with-Hive-meta-store-tp23664.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org