Re: Re: Spark streaming doesn't print output when working with standalone master
local[3] spawns 3 threads on 1 core :) Thanks Best Regards On Fri, Feb 20, 2015 at 12:50 PM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil, you are right. I checked and find that I have only 1 core allocated to the program I am running on a visual machine,and only allocate one processor to it(1 core per processor), so even if I have specified --total-executor-cores 3 in the submit script, the application will still only be allocated one processor. This leads to me another question: Although I have only one core, If I have specified the master and executor as --master local[3] --executor-memory 512M --total-executor-cores 3. Since I have only one core, why does this work? -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-02-20 15:13 *To:* bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: Spark streaming doesn't print output when working with standalone master While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. [image: Inline image 1] Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } -- [image: 提示图标] 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 *1* 个附件 image.png(13K) 极速下载 http://preview.mail.163.com/xdownload?filename=image.pngmid=xtbBERqmgVD%2Ba54h8AAAsmpart=3sign=6ae34eba8ee23b742de1031cde09ee34time=1424414893uid=bit1129%40163.com 在线预览 http://preview.mail.163.com/preview?mid=xtbBERqmgVD%2Ba54h8AAAsmpart=3sign=6ae34eba8ee23b742de1031cde09ee34time=1424414893uid=bit1129%40163.com
Regarding shuffle data file format
Hi, What is the file format which is used to write files while shuffle write? Is it dependent on the spark shuffle manager or output format? Is it possible to change the file format for shuffle, irrespective of the output format of the file? Thanks, Twinkle
Re: RDD Partition number
Hi All, Thanks for your answers. I have one more details to point out. It is clear now how partition number is defined for HDFS file, However, if i have my dataset replicated on all the machines in the same absolute path. In this case each machine has for instance ext3 filesystem. If i load the file in a RDD how many partitions are defined in this case and why? I found that Spark define a number, say K, of partitions. If i force the partition to be =K my parameter is ignored. If a set a value K*=K then Spark set K* partitions. Thanks for your help Alessandro On Thu, Feb 19, 2015 at 6:27 PM, Ted Yu yuzhih...@gmail.com wrote: bq. *blocks being 64MB by default in HDFS* *In hadoop 2.1+, default block size has been increased.* See https://issues.apache.org/jira/browse/HDFS-4053 Cheers On Thu, Feb 19, 2015 at 8:32 AM, Ted Yu yuzhih...@gmail.com wrote: What file system are you using ? If you use hdfs, the documentation you cited is pretty clear on how partitions are determined. bq. file X replicated on 4 machines I don't think replication factor plays a role w.r.t. partitions. On Thu, Feb 19, 2015 at 8:05 AM, Alessandro Lulli lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: *The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks* What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro
Re: Spark on Mesos: Multiple Users with iPython Notebooks
On Thu, Feb 19, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote: I am running Spark on Mesos and it works quite well. I have three users, all who setup iPython notebooks to instantiate a spark instance to work with on the notebooks. I love it so far. Since I am auto instantiating (I don't want a user to have to think about instantiating and submitting a spark app to do adhoc analysis, I want the environment setup ahead of time) this is done whenever an iPython notebook is open. So far it's working pretty good, save one issue: Every notebook is a new driver. I.e. every time they open a notebook, a new spark submit is called, and the driver resources are allocated, regardless if they are used or not. Yes, it's only the driver, but even that I find starts slowing down my queries for the notebooks that using spark. (I am running in Mesos Fined Grained mode). I have three users on my system, ideally, I would love to find a way so that on the first notebook being opened, a driver is started for that user, and then can be used for any notebook the user has open. So if they open a new notebook, I can check that yes, the user has a spark driver running, and thus, that notebook, if there is a query, will run it through that driver. That allows me to understand the resource allocation better, and it limits users from running 10 notebooks and having a lot of resources. The other thing I was wondering is could the driver actually be run on the mesos cluster? Right now, I have a edge node as an iPython server, the drivers all exist on that server, so as I get more and more drivers, the box's local resources get depleted with unused drivers. Obviously if I could reuse the drivers per user, on that box, that is great first step, but if I could reuse drivers, and run them on the cluster, that would be ideal. looking through the docs I was not clear on those options. If anyone could point me in the right direction, I would greatly appreciate it! Cluster mode support for Spark is tracked under [SPARK-5338]( https://issues.apache.org/jira/browse/SPARK-5338). I know Tim Chen is working on it, so there will be progress soon. iulian John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com
Re: Spark Performance on Yarn
None of this really points to the problem. These indicate that workers died but not why. I'd first go locate executor logs that reveal more about what's happening. It sounds like a hard-er type of failure, like JVM crash or running out of file handles, or GC thrashing. On Fri, Feb 20, 2015 at 4:51 AM, lbierman leebier...@gmail.com wrote: I'm a bit new to Spark, but had a question on performance. I suspect a lot of my issue is due to tuning and parameters. I have a Hive external table on this data and to run queries against it runs in minutes The Job: + 40gb of avro events on HDFS (100 million+ avro events) + Read in the files from HDFS and dedupe events by key (mapToPair then a reduceByKey) + RDD returned and persisted (disk and memory) + Then passed to a job that take the RDD and mapToPair of new object data and then reduceByKey and foreachpartion do work The issue: When I run this on my environment on Yarn this takes 20+ hours. Running on yarn we see the first stage runs to do build the RDD deduped, but then when the next stage starts, things fail and data is lost. This results in stage 0 starting over and over and just dragging it out. Errors I see in the driver logs: ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote Akka client disassociated 15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1, mapId=162, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect toX/X:33958 Also we see this, but I'm suspecting this is because the previous stage fails and the next one starts: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 Cluster: 5 machines, each 2 core , 8gb machines Spark-submit command: spark-submit --class com.myco.SparkJob \ --master yarn \ /tmp/sparkjob.jar \ Any thoughts or where to look or how to start approaching this problem or more data points to present. Thanks.. Code for the job: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1)) .reduceByKey((analyticsEvent1, analyticsEvent2) - analyticsEvent1) .map(tuple - tuple._1()); events.persist(StorageLevel.MEMORY_AND_DISK_2()); events.mapToPair(event - { return new Tuple2T, RunningAggregates( keySelector.select(event), new RunningAggregates( Optional.ofNullable(event.getVisitors()).orElse(0L), Optional.ofNullable(event.getImpressions()).orElse(0L), Optional.ofNullable(event.getAmount()).orElse(0.0D), Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D))); }) .reduceByKey((left, right) - { return left.add(right); }) .foreachpartition(dostuff) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Accumulator in SparkUI for streaming
On Spark 1.2: I am trying to capture # records read from a kafka topic: val inRecords = ssc.sparkContext.accumulator(0, InRecords) .. kInStreams.foreach( k = { k.foreachRDD ( rdd = inRecords += rdd.count().toInt ) inRecords.value Question is how do I get the accumulator to show up in the UI? I tried inRecords.value but that didn't help. Pretty sure it isn't showing up in Stage metrics. What's the trick here? collect? Thanks, Tim
DataFrame: Enable zipWithUniqueId
Hello Question regarding the new DataFrame API introduced here https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html I oftentimes use the zipWithUniqueId method of the SchemaRDD (as an RDD) to replace string keys with more efficient long keys. Would it be possible to use the same method in the new DataFrame class? It looks like unlike the SchemaRdd DataFrame does not extend RDD Thanks Dima -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-Enable-zipWithUniqueId-tp21733.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming Linear Regression
Hello Baris, Giving your complete source code (if not very long, or maybe via https://gist.github.com/) could be more helpful. Also telling which Spark version you use, on which file system, and how you run your application, together with the any log / output info it produces might make collective debugging relatively easier. -- Emre Sevinç http://www.bigindustries.be/ On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote: Hi I tried to run Streaming Linear Regression in my local. val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse) textFileStream is not seeing the new files. I search on the Internet, and I saw that somebody has same issue but no solution is found for that. Is there any opinion for this ? Is there any body who can achieve the running streaming linear regression ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: using a database connection pool to write data into an RDBMS from a Spark application
Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can you add Big Industries to the Powered by Spark page?
Hello, Could you please add Big Industries to the Powered by Spark page at https://cwiki.apache.org/confluence/display/SPARK/Powered+By+Spark ? Company Name: Big Industries URL: http://http://www.bigindustries.be/ Spark Components: Spark Streaming Use Case: Big Content Platform Summary: The Big Content Platform is a business-to-business content asset management service providing a searchable, aggregated source of live news feeds, public domain media and archives of content. The platform is founded on Apache Hadoop, uses the HDFS filesystem, Apache Spark, Titan Distributed Graph Database, HBase, and Solr. Additionally, the platform leverages public datasets like Freebase, DBpedia, Wiktionary, and Geonames to support semantic text enrichment. Kind regards, Emre Sevinç http://www.bigindustries.be/
Re: Re: Spark streaming doesn't print output when working with standalone master
Thanks Akhil. From: Akhil Das Date: 2015-02-20 16:29 To: bit1...@163.com CC: user Subject: Re: Re: Spark streaming doesn't print output when working with standalone master local[3] spawns 3 threads on 1 core :) Thanks Best Regards On Fri, Feb 20, 2015 at 12:50 PM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil, you are right. I checked and find that I have only 1 core allocated to the program I am running on a visual machine,and only allocate one processor to it(1 core per processor), so even if I have specified --total-executor-cores 3 in the submit script, the application will still only be allocated one processor. This leads to me another question: Although I have only one core, If I have specified the master and executor as --master local[3] --executor-memory 512M --total-executor-cores 3. Since I have only one core, why does this work? bit1...@163.com From: Akhil Das Date: 2015-02-20 15:13 To: bit1...@163.com CC: user Subject: Re: Spark streaming doesn't print output when working with standalone master While running the program go to your clusters webUI (that runs on 8080, prolly at hadoop.master:8080) and see how many cores are allocated to the program, it should be = 2 for the stream to get processed. Thanks Best Regards On Fri, Feb 20, 2015 at 9:29 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am trying the spark streaming log analysis reference application provided by Databricks at https://github.com/databricks/reference-apps/tree/master/logs_analyzer When I deploy the code to the standalone cluster, there is no output at will with the following shell script.Which means, the windowDStream has 0 RDDs ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master spark://hadoop.master:7077 --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar But, when I change --master to be --master local[3], the program starts to work fine. Can anyone have some advice? Thanks! ./spark-submit --deploy-mode client --name LogAnalyzerStreaming --master local[3] --executor-memory 512M --total-executor-cores 3 --class spark.examples.databricks.reference.apps.loganalysis.LogAnalyzerStreaming LogApp.jar object LogAnalyzerStreaming { val WINDOW_LENGTH = new Duration(12 * 1000) val SLIDE_INTERVAL = new Duration(6 * 1000) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(Log Analyzer Streaming in Scala) val sc = new SparkContext(sparkConf) val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL) val logLinesDStream = streamingContext.socketTextStream(localhost, ) val accessLogsDStream = logLinesDStream.map(ApacheAccessLog.parseLogLine).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) windowDStream.foreachRDD(accessLogs = { if (accessLogs.count() == 0) { println(No access com.databricks.app.logs received in this time interval) } else { // Calculate statistics based on the content size. val contentSizes = accessLogs.map(log = log.contentSize).cache() println(Content Size Avg: %s, Min: %s, Max: %s.format( contentSizes.reduce(_ + _) / contentSizes.count, contentSizes.min, contentSizes.max )) streamingContext.start() streamingContext.awaitTermination() } } 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image.png(13K) 极速下载 在线预览 邮件带有附件预览链接,若您转发或回复此邮件时不希望对方预览附件,建议您手动删除链接。 共有 1 个附件 image(02-20-15-14-57).png(13K) 极速下载 在线预览
Setting the number of executors in standalone mode
Hi there, I try to increase the number of executors per worker in the standalone mode and I have failed to achieve that. I followed a bit the instructions of this thread: http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores and did that: spark.executor.memory 1g SPARK_WORKER_MEMORY=8g hoping to get 8 executors per worker but its still 1. And the option num-executors is not available in the standalone mode. Thanks a lot!
Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?
Hello, We are building a Spark Streaming application that listens to a directory on HDFS, and uses the SolrJ library to send newly detected files to a Solr server. When we put 10.000 files to the directory it is listening to, it starts to process them by sending the files to our Solr server but after about a few thousand files the Spark Streaming application dies. Before the application dies, It gives some TimeoutException errors related to Akka, such as: util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] akka.pattern.AskTimeoutException: Timed out Any ideas on how to deal with this? Should we add/change/tweak some Spark configuration parameters? We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB of memory to that application when invoking it via spark-submit command. Below you can read the last few lines of the log file, showing what our Spark Streaming application logged just before it died: 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split: hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3004 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called with curMem=31171148, maxMem=794647 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB) 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block broadcast_3004_piece0 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3004 took 7897 ms 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=31192144, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as values in memory (estimated size 339.2 KB, free 1030.2 MB) 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called with curMem=31539507, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as bytes in memory (estimated size 2.6 KB, free 1030.2 MB) 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block rdd_3659_3 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client, config:maxConnections=128maxConnectionsPerHost=32followRedirects=false 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=31542134, maxMem=794647 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as bytes in memory (estimated size 5.0 B, free 1030.2 MB) 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block rdd_3660_3 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0 (TID 3455). 2516 bytes result sent to driver 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] - [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting down. LogType: stdout LogLength: 0 Log Contents: Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041 == LogType: stderr LogLength: 2952 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/02/20 13:29:27 INFO spark.SecurityManager: Changing view acls to: yarn,bjorn 15/02/20 13:29:27 INFO spark.SecurityManager: Changing modify acls to: yarn,bjorn 15/02/20 13:29:27 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn, bjorn); users with modify permissions:
GSOC2015
Hi Since we're approaching the GSOC2015 application process I have some questions: 1) Will your organization be a part of GSOC2015 and what are the projects that you will be interested in? 2) Since I'm not a contributor to apache spark, what are some starter tasks I can work on to gain facility with the code base? 3) Am I posting this to the right mailing list or should I post to the dev-mailing-list? Thanks a lot Regards,
Re: storing MatrixFactorizationModel (pyspark)
well, I understand the math (having two vectors) but the python MatrixFactorizationModel object seems to be just a wrapper around java class so not sure how to extract the two RDDs?thx,Antony. On Thursday, 19 February 2015, 16:32, Ilya Ganelin ilgan...@gmail.com wrote: Yep. the matrix model had two RDD vectors representing the decomposed matrix. You can save these to disk and re use them. On Thu, Feb 19, 2015 at 2:19 AM Antony Mayi antonym...@yahoo.com.invalid wrote: Hi, when getting the model out of ALS.train it would be beneficial to store it (to disk) so the model can be reused later for any following predictions. I am using pyspark and I had no luck pickling it either using standard pickle module or even dill. does anyone have a solution for this (note it is pyspark)? thank you,Antony.
Re: Why is RDD lookup slow?
Thanks you all. Just changing RDD to Map structure saved me approx. 1 second. Yes, I will check out IndexedRDD to see if it has better performance. best, /Shahab On Thu, Feb 19, 2015 at 6:38 PM, Burak Yavuz brk...@gmail.com wrote: If your dataset is large, there is a Spark Package called IndexedRDD optimized for lookups. Feel free to check that out. Burak On Feb 19, 2015 7:37 AM, Ilya Ganelin ilgan...@gmail.com wrote: Hi Shahab - if your data structures are small enough a broadcasted Map is going to provide faster lookup. Lookup within an RDD is an O(m) operation where m is the size of the partition. For RDDs with multiple partitions, executors can operate on it in parallel so you get some improvement for larger RDDs. On Thu, Feb 19, 2015 at 7:31 AM shahab shahab.mok...@gmail.com wrote: Hi, I am doing lookup on cached RDDs [(Int,String)], and I noticed that the lookup is relatively slow 30-100 ms ?? I even tried this on one machine with single partition, but no difference! The RDDs are not large at all, 3-30 MB. Is this expected behaviour? should I use other data structures, like HashMap to keep data and look up it there and use Broadcast to send a copy to all machines? best, /Shahab
Re: Spark job fails on cluster but works fine on a single machine
I definitely delete the file on the right HDFS, I only have one HDFS instance. The problem seems to be in the CassandraRDD - reading always fails in some way when run on the cluster, but single-machine reads are okay. On Feb 20, 2015, at 4:20 AM, Ilya Ganelin ilgan...@gmail.com wrote: The stupid question is whether you're deleting the file from hdfs on the right node? On Thu, Feb 19, 2015 at 11:31 AM Pavel Velikhov pavel.velik...@gmail.com mailto:pavel.velik...@gmail.com wrote: Yeah, I do manually delete the files, but it still fails with this error. On Feb 19, 2015, at 8:16 PM, Ganelin, Ilya ilya.gane...@capitalone.com mailto:ilya.gane...@capitalone.com wrote: When writing to hdfs Spark will not overwrite existing files or directories. You must either manually delete these or use Java's Hadoop FileSystem class to remove them. Sent with Good (www.good.com http://www.good.com/) -Original Message- From: Pavel Velikhov [pavel.velik...@gmail.com mailto:pavel.velik...@gmail.com] Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time To: user@spark.apache.org mailto:user@spark.apache.org Subject: Spark job fails on cluster but works fine on a single machine I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Streaming Linear Regression
Baris, I've tried the following piece of code: https://gist.github.com/emres/10c509c1d69264fe6fdb and built it using sbt package and then submitted it via spark-submit --class org.apache.spark.examples.mllib.StreamingLinearRegression --master local[4] target/scala-2.10/streaminglinearregression_2.10-1.0.jar And once it started to run, I've waited for a few seconds, and then I've copied a few files to /home/emre/data/train And observed the log output on my console: 15/02/20 13:08:35 INFO FileInputDStream: Finding new files took 29 ms 15/02/20 13:08:35 INFO FileInputDStream: New files at time 1424434115000 ms: file:/home/emre/data/train/newsMessageNL14.json file:/home/emre/data/train/newsMessageNL11.json file:/home/emre/data/train/newsMessageNL10.json file:/home/emre/data/train/newsMessageNL6.json file:/home/emre/data/train/newsMessageNL8.json file:/home/emre/data/train/newsMessageNL5.json file:/home/emre/data/train/newsMessageNL1.json file:/home/emre/data/train/newsMessageNL9.json file:/home/emre/data/train/newsMessageNL2.json file:/home/emre/data/train/newsMessageNL16.json file:/home/emre/data/train/newsMessageNL20.json file:/home/emre/data/train/newsMessageNL12.json file:/home/emre/data/train/newsMessageNL4.json file:/home/emre/data/train/newsMessageNL19.json file:/home/emre/data/train/newsMessageNL7.json file:/home/emre/data/train/newsMessageNL17.json file:/home/emre/data/train/newsMessageNL18.json file:/home/emre/data/train/newsMessageNL3.json file:/home/emre/data/train/newsMessageNL13.json file:/home/emre/data/train/newsMessageNL15.json 15/02/20 13:08:35 INFO MemoryStore: ensureFreeSpace(214074) called with curMem=0, maxMem=278019440 The contents of JSON files of course don't make sense in this context (building a linear regression model), I've used them only to check whether the system detects new files, and as can be seen above, it does. You can start from the source code I've shared, which is detecting new files, and continue to build your particular streaming linear regression application. -- Emre Sevinç http://www.bigindustries.be On Thu, Feb 19, 2015 at 9:01 PM, barisak baris.akg...@gmail.com wrote: Hi I tried to run Streaming Linear Regression in my local. val trainingData = ssc.textFileStream(/home/barisakgu/Desktop/Spark/train).map(LabeledPoint.parse) textFileStream is not seeing the new files. I search on the Internet, and I saw that somebody has same issue but no solution is found for that. Is there any opinion for this ? Is there any body who can achieve the running streaming linear regression ? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Linear-Regression-tp21726.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Emre Sevinc
Re: Spark on Mesos: Multiple Users with iPython Notebooks
Awesome! This is exactly what I'd need. Unfortunately, I am not a programmer of any talent or skill, but how could I assist with this JIRA? From a User perspective, this is really the next step for my org taking our Mesos cluster to user land with Spark. I don't want to be pushy, but is there any sort of time frame I could possibly communicate to my team? Anything I can do? Thanks! On Fri, Feb 20, 2015 at 4:36 AM, Iulian Dragoș iulian.dra...@typesafe.com wrote: On Thu, Feb 19, 2015 at 2:49 PM, John Omernik j...@omernik.com wrote: I am running Spark on Mesos and it works quite well. I have three users, all who setup iPython notebooks to instantiate a spark instance to work with on the notebooks. I love it so far. Since I am auto instantiating (I don't want a user to have to think about instantiating and submitting a spark app to do adhoc analysis, I want the environment setup ahead of time) this is done whenever an iPython notebook is open. So far it's working pretty good, save one issue: Every notebook is a new driver. I.e. every time they open a notebook, a new spark submit is called, and the driver resources are allocated, regardless if they are used or not. Yes, it's only the driver, but even that I find starts slowing down my queries for the notebooks that using spark. (I am running in Mesos Fined Grained mode). I have three users on my system, ideally, I would love to find a way so that on the first notebook being opened, a driver is started for that user, and then can be used for any notebook the user has open. So if they open a new notebook, I can check that yes, the user has a spark driver running, and thus, that notebook, if there is a query, will run it through that driver. That allows me to understand the resource allocation better, and it limits users from running 10 notebooks and having a lot of resources. The other thing I was wondering is could the driver actually be run on the mesos cluster? Right now, I have a edge node as an iPython server, the drivers all exist on that server, so as I get more and more drivers, the box's local resources get depleted with unused drivers. Obviously if I could reuse the drivers per user, on that box, that is great first step, but if I could reuse drivers, and run them on the cluster, that would be ideal. looking through the docs I was not clear on those options. If anyone could point me in the right direction, I would greatly appreciate it! Cluster mode support for Spark is tracked under [SPARK-5338](https://issues.apache.org/jira/browse/SPARK-5338). I know Tim Chen is working on it, so there will be progress soon. iulian John - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Iulian Dragos -- Reactive Apps on the JVM www.typesafe.com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
what does Submitting ... missing tasks from Stage mean?
Hi, Probably this is silly question, but I couldn't find any clear documentation explaining why one should submitting... missing tasks from Stage ... in the logs? Specially in my case when I do not have any failure in job execution, I wonder why this should happen? Does it have any relation to lazy evaluation? best, /Shahab
Re: Where to look for potential causes for Akka timeout errors in a Spark Streaming Application?
Hi Emre, Have you tried adjusting these: .set(spark.akka.frameSize, 500).set(spark.akka.askTimeout, 30).set(spark.core.connection.ack.wait.timeout, 600) -Todd On Fri, Feb 20, 2015 at 8:14 AM, Emre Sevinc emre.sev...@gmail.com wrote: Hello, We are building a Spark Streaming application that listens to a directory on HDFS, and uses the SolrJ library to send newly detected files to a Solr server. When we put 10.000 files to the directory it is listening to, it starts to process them by sending the files to our Solr server but after about a few thousand files the Spark Streaming application dies. Before the application dies, It gives some TimeoutException errors related to Akka, such as: util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] akka.pattern.AskTimeoutException: Timed out Any ideas on how to deal with this? Should we add/change/tweak some Spark configuration parameters? We're using Spark 1.2.0 on a YARN cluster, and we're giving 4 cores and 2GB of memory to that application when invoking it via spark-submit command. Below you can read the last few lines of the log file, showing what our Spark Streaming application logged just before it died: 15/02/20 13:28:25 INFO rdd.NewHadoopRDD: Input split: hdfs://node01.demo.hadoop:8020/user/bjorn/spark-belga/dbpedia-translator-out/2fdf95f1-67d6-40b7-9345-fe129e38a2d9.json:0+2620 15/02/20 13:28:25 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 3004 15/02/20 13:28:32 INFO storage.MemoryStore: ensureFreeSpace(20996) called with curMem=31171148, maxMem=794647 15/02/20 13:28:32 INFO storage.MemoryStore: Block broadcast_3004_piece0 stored as bytes in memory (estimated size 20.5 KB, free 1030.5 MB) 15/02/20 13:28:33 INFO storage.BlockManagerMaster: Updated info of block broadcast_3004_piece0 15/02/20 13:28:33 INFO broadcast.TorrentBroadcast: Reading broadcast variable 3004 took 7897 ms 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(347363) called with curMem=31192144, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block broadcast_3004 stored as values in memory (estimated size 339.2 KB, free 1030.2 MB) 15/02/20 13:28:33 INFO storage.MemoryStore: ensureFreeSpace(2627) called with curMem=31539507, maxMem=794647 15/02/20 13:28:33 INFO storage.MemoryStore: Block rdd_3659_3 stored as bytes in memory (estimated size 2.6 KB, free 1030.2 MB) 15/02/20 13:28:34 INFO storage.BlockManagerMaster: Updated info of block rdd_3659_3 15/02/20 13:28:34 INFO impl.HttpClientUtil: Creating new http client, config:maxConnections=128maxConnectionsPerHost=32followRedirects=false 15/02/20 13:28:36 INFO storage.MemoryStore: ensureFreeSpace(5) called with curMem=31542134, maxMem=794647 15/02/20 13:28:36 INFO storage.MemoryStore: Block rdd_3660_3 stored as bytes in memory (estimated size 5.0 B, free 1030.2 MB) 15/02/20 13:28:40 INFO storage.BlockManagerMaster: Updated info of block rdd_3660_3 15/02/20 13:28:40 INFO executor.Executor: Finished task 3.0 in stage 245.0 (TID 3455). 2516 bytes result sent to driver 15/02/20 13:29:07 WARN util.AkkaUtils: Error sending message in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:398) 15/02/20 13:29:08 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkexecu...@node08.demo.hadoop:48042] - [akka.tcp://sparkdri...@node07.demo.hadoop:56535] disassociated! Shutting down. LogType: stdout LogLength: 0 Log Contents: Container: container_1422006251277_0837_01_04 on node08.demo.hadoop_8041 == LogType: stderr LogLength: 2952 Log Contents: SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/disk1/cm/parcels/CDH-5.3.0-1.cdh5.3.0.p0.30/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/mnt/disk3/yarn/nm/usercache/bjorn/filecache/354/bigcontent-1.0-SNAPSHOT.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/02/20 13:29:26 INFO executor.CoarseGrainedExecutorBackend: Registered signal handlers for [TERM, HUP, INT] 15/02/20 13:29:27 INFO
Re: Spark Streaming and message ordering
For a given batch, for a given partition, the messages will be processed in order by the executor that is running that partition. That's because messages for the given offset range are pulled by the executor, not pushed from some other receiver. If you have speculative execution, yes, another executor may be running that partition. If your job is lagging behind in processing such that the next batch starts executing before the last batch is finished processing, yes it is possible for some other executor to start working on messages from that same kafka partition. The obvious solution here seems to be turn off speculative execution and adjust your batch interval / sizes such that they can comfortably finish processing :) If your processing time is sufficiently non-linear with regard to the number of messages, yes you might be able to do something with overriding dstream.compute. Unfortunately the new kafka dstream implementation is private, so it's not straightforward to subclass it. I'd like to get a solution in place for people who need to be able to tune the batch generation policy (I need to as well, for unrelated reasons). Maybe you can say a little more about your use case. But regardless of the technology you're using to read from kafka (spark, storm, whatever), kafka only gives you ordering as to a particular partition. So you're going to need to do some kind of downstream sorting if you really care about a global order. On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote: Even with the new direct streams in 1.3, isn't it the case that the job *scheduling* follows the partition order, rather than job *execution*? Or is it the case that the stream listens to job completion event (using a streamlistener) before scheduling the next batch? To compare with storm from a message ordering point of view, unless a tuple is fully processed by the DAG (as defined by spout+bolts), the next tuple does not enter the DAG. On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote: Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there is a 1:1 correspondence between spark partitions and kafka partitions. So you will get deterministic ordering, but only on a per-partition basis. On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote: I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose. We talked a bit about this after his presentation about this - the short answer is spark streaming does not guarantee any sort of ordering (within batches, across batches). One would have to use updateStateByKey to collect the events and sort them based on some attribute of the event. But TD said message ordering is a frequently asked feature recently and is getting on his radar. I went through the source code and there does not seem to be any architectural/design limitation to support this. (JobScheduler, JobGenerator are a good starting point to see how stuff works under the hood). Overriding DStream#compute and using streaminglistener looks like a simple way of ensuring ordered execution of batches within a stream. But this would be a partial solution, since ordering within a batch needs some more work that I don't understand fully yet. Side note : My custom receiver polls the metricsservlet once in a while to decide whether jobs are getting done fast enough and throttle/relax pushing data in to receivers based on the numbers provided by metricsservlet. I had to do this because out-of-the-box rate limiting right now is static and cannot adapt to the state of the cluster thnx -neelesh On Wed, Feb 18, 2015 at 4:13 PM, jay vyas jayunit100.apa...@gmail.com wrote: This is a *fantastic* question. The idea of how we identify individual things in multiple DStreams is worth looking at. The reason being, that you can then fine tune your streaming job, based on the RDD identifiers (i.e. are the timestamps from the producer correlating closely to the order in which RDD elements are being produced) ? If *NO* then you need to (1) dial up throughput on producer sources or else (2) increase cluster size so that spark is capable of evenly handling load. You cant decide to do (1) or (2) unless you can track when the streaming elements are being converted to RDDs by spark itself. On Wed, Feb 18, 2015 at 6:54 PM, Neelesh neele...@gmail.com wrote: There does not seem to be a definitive answer on this. Every time I google for message ordering,the only relevant thing that comes up is this - http://samza.apache.org/learn/documentation/0.8/comparisons/spark-streaming.html . With a kafka receiver that pulls data from a single kafka partition of a kafka topic,
How Spark and Flink are shaping the future of Hadoop?
Hi 1.*To get a taste* of my talk at the 2015 Hadoop Summit, please find below a few links to a similar talk that I gave at the Chicago Hadoop Users Group on ‘ *Transitioning Compute Models: Apache MapReduce to Spark*’ on February 12, 2015 in front of 185 attendees: - Video Recording: http://goo.gl/f30eEn - Slides: http://goo.gl/Ikx4Ud - Blog Entry: http://goo.gl/Pc6qiz 2. To *vote* for my proposal at the 2015 Hadoop Summit on '*How Spark and Flink are shaping the future of Hadoop*?' Simply visit http://goo.gl/qfqSR9 , click on 'Vote', pick 3 votes and enter your name and email. Done in less than 30 seconds! Thanks in advance for your help Slim Baltagi Sr. Big Data Architect http://ww.SparkBigData.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-Spark-and-Flink-are-shaping-the-future-of-Hadoop-tp21743.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: loads of memory still GC overhead limit exceeded
Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS performance should be improved in 1.3.0. -Xiangrui On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi Ilya, thanks for your insight, this was the right clue. I had default parallelism already set but it was quite low (hundreds) and moreover the number of partitions of the input RDD was low as well so the chunks were really too big. Increased parallelism and repartitioning seems to be helping... Thanks! Antony. On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com wrote: Hi Anthony - you are seeing a problem that I ran into. The underlying issue is your default parallelism setting. What's happening is that within ALS certain RDD operations end up changing the number of partitions you have of your data. For example if you start with an RDD of 300 partitions, unless default parallelism is set while the algorithm executes you'll eventually get an RDD with something like 20 partitions. Consequently, your giant data set is now stored across a much smaller number of partitions so each partition is huge. Then, when a shuffle requires serialization you run out of heap space trying to serialize it. The solution should be as simple as setting the default parallelism setting. This is referenced in a JIRA I can't find at the moment. On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid wrote: now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout === 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) === 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager === 2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used 2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used 2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 143 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout === 15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms 15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart
randomSplit instead of a huge map reduce ?
Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: output worker stdout to one place
Thanks Marcelo! I will try to change the log4j.properties On Fri, Feb 20, 2015 at 11:37 AM, Marcelo Vanzin van...@cloudera.com wrote: Hi Anny, You could play with creating your own log4j.properties that will write the output somewhere else (e.g. to some remote mount, or remote syslog). Sorry, but I don't have an example handy. Alternatively, if you can use Yarn, it will collect all logs after the job is finished and make them available as a single file using the yarn logs command. On Fri, Feb 20, 2015 at 11:31 AM, anny9699 anny9...@gmail.com wrote: Hi, I am wondering if there's some way that could lead some of the worker stdout to one place instead of in each worker's stdout. For example, I have the following code RDD.foreach{line = try{ do something }catch{ case e:exception = println(line) } } Every time I want to check what's causing the exception, I have to check one worker after another in the UI, because I don't know which worker will be dealing with the exception case. Is there a way that the println could print to one place instead of separate worker stdout so that I only need to check one place? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo
Re: using hivecontext with sparksql on cdh 5.3
Correction, should be HADOOP_CONF_DIR=/etc/hive/conf spark-shell --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' On Fri, Feb 20, 2015 at 3:48 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Correction, should be HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Also, you might want to add the hadoop configs: HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' Ok. It needs the CDH configs for hive and hadoop. Hopefully this works for you. On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com wrote: Thanks! I am able to login to Spark now but I am still getting the same error scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM analytics.trainingdatafinal SELECT * 15/02/20 14:40:22 INFO ParseDriver: Parse Completed 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar. 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/20 14:40:23 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/20 14:40:29 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics tbl=trainingdatafinal
Re: using hivecontext with sparksql on cdh 5.3
Correction, should be HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Also, you might want to add the hadoop configs: HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' Ok. It needs the CDH configs for hive and hadoop. Hopefully this works for you. On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com wrote: Thanks! I am able to login to Spark now but I am still getting the same error scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM analytics.trainingdatafinal SELECT * 15/02/20 14:40:22 INFO ParseDriver: Parse Completed 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar. 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/20 14:40:23 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/20 14:40:29 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Re: high GC in the Kmeans algorithm
A single vector of size 10^7 won't hit that bound. How many clusters did you set? The broadcast variable size is 10^7 * k and you can calculate the amount of memory it needs. Try to reduce the number of tasks and see whether it helps. -Xiangrui On Tue, Feb 17, 2015 at 7:20 PM, lihu lihu...@gmail.com wrote: Thanks for your answer. Yes, I cached the data, I can observed from the WebUI that all the data is cached in the memory. What I worry is that the dimension, not the total size. Sean Owen ever answered me that the Broadcast support the maximum array size is 2GB, so 10^7 is a little huge? On Wed, Feb 18, 2015 at 5:43 AM, Xiangrui Meng men...@gmail.com wrote: Did you cache the data? Was it fully cached? The k-means implementation doesn't create many temporary objects. I guess you need more RAM to avoid GC triggered frequently. Please monitor the memory usage using YourKit or VisualVM. -Xiangrui On Wed, Feb 11, 2015 at 1:35 AM, lihu lihu...@gmail.com wrote: I just want to make the best use of CPU, and test the performance of spark if there is a lot of task in a single node. On Wed, Feb 11, 2015 at 5:29 PM, Sean Owen so...@cloudera.com wrote: Good, worth double-checking that's what you got. That's barely 1GB per task though. Why run 48 if you have 24 cores? On Wed, Feb 11, 2015 at 9:03 AM, lihu lihu...@gmail.com wrote: I give 50GB to the executor, so it seem that there is no reason the memory is not enough. On Wed, Feb 11, 2015 at 4:50 PM, Sean Owen so...@cloudera.com wrote: Meaning, you have 128GB per machine but how much memory are you giving the executors? On Wed, Feb 11, 2015 at 8:49 AM, lihu lihu...@gmail.com wrote: What do you mean? Yes,I an see there is some data put in the memory from the web ui. On Wed, Feb 11, 2015 at 4:25 PM, Sean Owen so...@cloudera.com wrote: Are you actually using that memory for executors? On Wed, Feb 11, 2015 at 8:17 AM, lihu lihu...@gmail.com wrote: Hi, I run the kmeans(MLlib) in a cluster with 12 workers. Every work own a 128G RAM, 24Core. I run 48 task in one machine. the total data is just 40GB. When the dimension of the data set is about 10^7, for every task the duration is about 30s, but the cost for GC is about 20s. When I reduce the dimension to 10^4, then the gc is small. So why gc is so high when the dimension is larger? or this is the reason caused by MLlib? -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ -- Best Wishes! Li Hu(李浒) | Graduate Student Institute for Interdisciplinary Information Sciences(IIIS) Tsinghua University, China Email: lihu...@gmail.com Homepage: http://iiis.tsinghua.edu.cn/zh/lihu/ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: loads of memory still GC overhead limit exceeded
No problem, Antony. ML lib is tricky! I'd love to chat with you about your use case - sounds like we're working on similar problems/scales. On Fri, Feb 20, 2015 at 1:55 PM Xiangrui Meng men...@gmail.com wrote: Hi Antony, Is it easy for you to try Spark 1.3.0 or master? The ALS performance should be improved in 1.3.0. -Xiangrui On Fri, Feb 20, 2015 at 1:32 PM, Antony Mayi antonym...@yahoo.com.invalid wrote: Hi Ilya, thanks for your insight, this was the right clue. I had default parallelism already set but it was quite low (hundreds) and moreover the number of partitions of the input RDD was low as well so the chunks were really too big. Increased parallelism and repartitioning seems to be helping... Thanks! Antony. On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com wrote: Hi Anthony - you are seeing a problem that I ran into. The underlying issue is your default parallelism setting. What's happening is that within ALS certain RDD operations end up changing the number of partitions you have of your data. For example if you start with an RDD of 300 partitions, unless default parallelism is set while the algorithm executes you'll eventually get an RDD with something like 20 partitions. Consequently, your giant data set is now stored across a much smaller number of partitions so each partition is huge. Then, when a shuffle requires serialization you run out of heap space trying to serialize it. The solution should be as simple as setting the default parallelism setting. This is referenced in a JIRA I can't find at the moment. On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid wrote: now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout === 15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream. java:371) at org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:62) === yarn log (same) === 15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329) java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject( ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream. java:371) at org.apache.spark.serializer.JavaDeserializationStream. readObject(JavaSerializer.scala:62) === yarn nodemanager === 2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager. monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used 2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager. monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used 2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 143 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager. container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE 2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager. launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to
Re: using hivecontext with sparksql on cdh 5.3
That worked perfectly...thanks so much! On Fri, Feb 20, 2015 at 3:49 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Correction, should be HADOOP_CONF_DIR=/etc/hive/conf spark-shell --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' On Fri, Feb 20, 2015 at 3:48 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Correction, should be HADOOP_CONF_DIR=/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' On Fri, Feb 20, 2015 at 3:43 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Also, you might want to add the hadoop configs: HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' Ok. It needs the CDH configs for hive and hadoop. Hopefully this works for you. On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com wrote: Thanks! I am able to login to Spark now but I am still getting the same error scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM analytics.trainingdatafinal SELECT * 15/02/20 14:40:22 INFO ParseDriver: Parse Completed 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar. 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/20 14:40:23 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/20 14:40:29 INFO SessionState: No Tez session
Re: Spark Performance on Yarn
Hi Sandy, I appreciate your clear explanation. Let me try again. It's the best way to confirm I understand. spark.executor.memory + spark.yarn.executor.memoryOverhead = the memory that YARN will create a JVM spark.executor.memory = the memory I can actually use in my jvm application = part of it (spark.storage.memoryFraction) is reserved for caching + part of it (spark.shuffle.memoryFraction) is reserved for shuffling + the remaining is for bookkeeping UDFs If I am correct above, then one implication from them is: (spark.executor.memory + spark.yarn.executor.memoryOverhead) * number of executors per machine should be configured smaller than a single machine physical memory Right? Again, thanks! Kelvin On Fri, Feb 20, 2015 at 11:50 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Kelvin, spark.executor.memory controls the size of the executor heaps. spark.yarn.executor.memoryOverhead is the amount of memory to request from YARN beyond the heap size. This accounts for the fact that JVMs use some non-heap memory. The Spark heap is divided into spark.storage.memoryFraction (default 0.6) and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic Spark bookkeeping and anything the user does inside UDFs. -Sandy On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com wrote: Hi Sandy, I am also doing memory tuning on YARN. Just want to confirm, is it correct to say: spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I can actually use in my jvm application If it is not, what is the correct relationship? Any other variables or config parameters in play? Thanks. Kelvin On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using hivecontext with sparksql on cdh 5.3
Try it without --master yarn-cluster if you are trying to run a spark-shell. :) On Fri, Feb 20, 2015 at 3:18 PM, chirag lakhani chirag.lakh...@gmail.com wrote: I tried spark-shell --master yarn-cluster --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' and I get the following error Error: Cluster deploy mode is not applicable to Spark shells. Run with --help for usage help or --verbose for debug output On Fri, Feb 20, 2015 at 2:52 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Chirag, This worked for us: spark-submit --master yarn-cluster --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' ... Let me know, if you have any issues. On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com wrote: I am trying to access a hive table using spark sql but I am having trouble. I followed the instructions in a cloudera community board which stated 1) Import hive jars into the class path export SPARK_CLASSPATH=$(find /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name '*.jar' -print0 | sed 's/\x0/:/g') 2) start the spark shell spark-shell 3) created a hive context val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 4) then run query sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) When I do this it seems that it cannot find the table in the hive metastore, I have put all of my cloudera parcels in the partition starting with /data as opposed to the default location used by cloudera. Any suggestions on what can be done? I am putting the error below 15/02/20 13:43:01 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at com.sun.proxy.$Proxy24.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy25.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at
Re: Spark 1.3 SQL Programming Guide and sql._ / sql.types._
Oh no worries at all. If you want, I'd be glad to make updates and PR for anything I find, eh?! On Fri, Feb 20, 2015 at 12:18 Michael Armbrust mich...@databricks.com wrote: Yeah, sorry. The programming guide has not been updated for 1.3. I'm hoping to get to that this weekend / next week. On Fri, Feb 20, 2015 at 9:55 AM, Denny Lee denny.g@gmail.com wrote: Quickly reviewing the latest SQL Programming Guide https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md (in github) I had a couple of quick questions: 1) Do we need to instantiate the SparkContext as per // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) Within Spark 1.3 the sqlContext is already available so probably do not need to make this call. 2) Importing org.apache.spark.sql._ should bring in both SQL data types, struct types, and row // Import Spark SQL data types and Row. import org.apache.spark.sql._ Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only brings in row. scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) console:25: error: not found: value StructType StructType( But if I also import in org.apache.spark.sql.types_ scala import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(DeviceMake,StringType,true), StructField(Country,StringType,true)) Wondering if this is by design or perhaps a quick documentation / package update is warranted.
Spark performance tuning
Hi, I am new to Spark, and I am trying to test the Spark SQL performance vs Hive. I setup a standalone box, with 24 cores and 64G memory. We have one SQL in mind to test. Here is the basically setup on this one box for the SQL we are trying to run: 1) Dataset 1, 6.6G AVRO file with snappy compression, which contains nest structure of 3 array of struct in AVRO2) Dataset2, 5G AVRO file with snappy compression3) Dataset3, 2.3M AVRO file with snappy compression. The basic structure of the query is like this: (selectxxxfromdataset1 lateral view outer explode(struct1) lateral view outer explode(struct2)where x )left outer join(select from dataset2 lateral view explode(xxx) where )on left outer join(select xxx from dataset3 where )on x So overall what it does is 2 outer explode on dataset1, left outer join with explode of dataset2, then finally left outer join with dataset 3. On this standalone box, I installed Hadoop 2.2 and Hive 0.12, and Spark 1.2.0. Baseline, the above query can finish around 50 minutes in Hive 12, with 6 mappers and 3 reducers, each with 1G max heap, in 3 rounds of MR jobs. This is a very expensive query running in our production, of course with much bigger data set, every day. Now I want to see how fast Spark can do for the same query. I am using the following settings, based on my understanding of Spark, for a fair test between it and Hive: export SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2g--executor-memory 9g --total-executor-cores 9 I am trying to run the one executor with 9 cores and max 9G heap, to make Spark use almost same resource we gave to the MapReduce. Here is the result without any additional configuration changes, running under Spark 1.2.0, using HiveContext in Spark SQL, to run the exactly same query: The Spark SQL generated 5 stage of tasks, shown below:4 collect at SparkPlan.scala:84 +details 2015/02/20 10:48:46 26 s200/200 3 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:32:07 16 min 200/200 1112.3 MB2 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 9 min 40/40 4.7 GB 22.2 GB1 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 1.9 min 50/50 6.2 GB 2.8 GB0 mapPartitions at Exchange.scala:64 +details 2015/02/20 10:22:06 6 s 2/2 2.3 MB 156.6 KB So the wall time of whole query is 26s + 16m + 9m + 2m + 6s, around 28 minutes. It is about 56% of originally time, not bad. But I want to know any tuning of Spark can make it even faster. For stage 2 and 3, I observed that GC time is more and more expensive. Especially in stage 3, shown below: For stage 3:Metric Min 25th percentile Median 75th percentile MaxDuration20 s30 s35 s39 s 2.4 minGC Time 9 s 17 s20 s25 s 2.2 minShuffle Write 4.7 MB 4.9 MB 5.2 MB 6.1 MB 8.3 MB So in median, the GC time took overall 20s/35s = 57% of time. First change I made is to add the following line in the spark-default.conf:spark.serializer org.apache.spark.serializer.KryoSerializer My assumption is that using kryoSerializer, instead of default java serialize, will lower the memory footprint, should lower the GC pressure during runtime. I know the I changed the correct spark-default.conf, because if I were add spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps in the same file, I will see the GC usage in the stdout file. Of course, in this test, I didn't add that, as I want to only make one change a time.The result is almost the same, as using standard java serialize. The wall time is still 28 minutes, and in stage 3, the GC still took around 50 to 60% of time, almost same result within min, median to max in stage 3, without any noticeable performance gain. Next, based on my understanding, and for this test, I think the default spark.storage.memoryFraction is too high for this query, as there is no reason to reserve so much memory for caching data, Because we don't reuse any dataset in this one query. So I add this at the end of spark-shell command --conf spark.storage.memoryFraction=0.3, as I want to just reserve half of the memory for caching data vs first time. Of course, this time, I rollback the first change of KryoSerializer. The result looks like almost the same. The whole query finished around 28s + 14m + 9.6m + 1.9m + 6s = 27 minutes. It looks like that Spark is faster than Hive, but is there any steps I can make it even faster? Why using KryoSerializer makes no difference? If I want to use the same resource as now, anything I can do to speed it up more, especially lower the GC time? Thanks Yong
Re: using hivecontext with sparksql on cdh 5.3
Thanks! I am able to login to Spark now but I am still getting the same error scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM analytics.trainingdatafinal SELECT * 15/02/20 14:40:22 INFO ParseDriver: Parse Completed 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar. 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/20 14:40:23 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/20 14:40:29 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at com.sun.proxy.$Proxy24.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy25.getTable(Unknown Source) at
Re: using hivecontext with sparksql on cdh 5.3
Also, you might want to add the hadoop configs: HADOOP_CONF_DIR=/etc/hadoop/conf:/etc/hive/conf --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/ parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' Ok. It needs the CDH configs for hive and hadoop. Hopefully this works for you. On Fri, Feb 20, 2015 at 3:41 PM, chirag lakhani chirag.lakh...@gmail.com wrote: Thanks! I am able to login to Spark now but I am still getting the same error scala sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) 15/02/20 14:40:22 INFO ParseDriver: Parsing command: FROM analytics.trainingdatafinal SELECT * 15/02/20 14:40:22 INFO ParseDriver: Parse Completed 15/02/20 14:40:23 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 15/02/20 14:40:23 INFO ObjectStore: ObjectStore, initialize called 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-api-jdo-3.2.6.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-api-jdo-3.2.6.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-rdbms-3.2.9.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-rdbms-3.2.9.jar. 15/02/20 14:40:23 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/jars/datanucleus-core-3.2.10.jar is already registered, and you are trying to register an identical plugin located at URL file:/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/datanucleus-core-3.2.10.jar. 15/02/20 14:40:23 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 15/02/20 14:40:23 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 15/02/20 14:40:25 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes=Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order 15/02/20 14:40:25 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5. Encountered: @ (64), after : . 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:27 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 15/02/20 14:40:28 INFO Query: Reading in results for query org.datanucleus.store.rdbms.query.SQLQuery@0 since the connection used is closing 15/02/20 14:40:28 INFO ObjectStore: Initialized ObjectStore 15/02/20 14:40:28 INFO HiveMetaStore: Added admin role in metastore 15/02/20 14:40:28 INFO HiveMetaStore: Added public role in metastore 15/02/20 14:40:29 INFO HiveMetaStore: No user is added in admin role, since config is empty 15/02/20 14:40:29 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr. 15/02/20 14:40:29 INFO HiveMetaStore: 0: get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 INFO audit: ugi=hdfs ip=unknown-ip-addr cmd=get_table : db=analytics tbl=trainingdatafinal 15/02/20 14:40:29 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at
Re: Spark Performance on Yarn
That's all correct. -Sandy On Fri, Feb 20, 2015 at 1:23 PM, Kelvin Chu 2dot7kel...@gmail.com wrote: Hi Sandy, I appreciate your clear explanation. Let me try again. It's the best way to confirm I understand. spark.executor.memory + spark.yarn.executor.memoryOverhead = the memory that YARN will create a JVM spark.executor.memory = the memory I can actually use in my jvm application = part of it (spark.storage.memoryFraction) is reserved for caching + part of it (spark.shuffle.memoryFraction) is reserved for shuffling + the remaining is for bookkeeping UDFs If I am correct above, then one implication from them is: (spark.executor.memory + spark.yarn.executor.memoryOverhead) * number of executors per machine should be configured smaller than a single machine physical memory Right? Again, thanks! Kelvin On Fri, Feb 20, 2015 at 11:50 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Kelvin, spark.executor.memory controls the size of the executor heaps. spark.yarn.executor.memoryOverhead is the amount of memory to request from YARN beyond the heap size. This accounts for the fact that JVMs use some non-heap memory. The Spark heap is divided into spark.storage.memoryFraction (default 0.6) and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic Spark bookkeeping and anything the user does inside UDFs. -Sandy On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com wrote: Hi Sandy, I am also doing memory tuning on YARN. Just want to confirm, is it correct to say: spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I can actually use in my jvm application If it is not, what is the correct relationship? Any other variables or config parameters in play? Thanks. Kelvin On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: loads of memory still GC overhead limit exceeded
Hi Ilya, thanks for your insight, this was the right clue. I had default parallelism already set but it was quite low (hundreds) and moreover the number of partitions of the input RDD was low as well so the chunks were really too big. Increased parallelism and repartitioning seems to be helping... Thanks!Antony. On Thursday, 19 February 2015, 16:45, Ilya Ganelin ilgan...@gmail.com wrote: Hi Anthony - you are seeing a problem that I ran into. The underlying issue is your default parallelism setting. What's happening is that within ALS certain RDD operations end up changing the number of partitions you have of your data. For example if you start with an RDD of 300 partitions, unless default parallelism is set while the algorithm executes you'll eventually get an RDD with something like 20 partitions. Consequently, your giant data set is now stored across a much smaller number of partitions so each partition is huge. Then, when a shuffle requires serialization you run out of heap space trying to serialize it. The solution should be as simple as setting the default parallelism setting. This is referenced in a JIRA I can't find at the moment. On Thu, Feb 19, 2015 at 5:10 AM Antony Mayi antonym...@yahoo.com.invalid wrote: now with reverted spark.shuffle.io.preferDirectBufs (to true) getting again GC overhead limit exceeded: === spark stdout ===15/02/19 12:08:08 WARN scheduler.TaskSetManager: Lost task 7.0 in stage 18.0 (TID 5329, 192.168.1.93): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn log (same) ===15/02/19 12:08:08 ERROR executor.Executor: Exception in task 7.0 in stage 18.0 (TID 5329)java.lang.OutOfMemoryError: GC overhead limit exceeded at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) === yarn nodemanager ===2015-02-19 12:08:13,758 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19014 for container-id container_1424204221358_0013_01_12: 29.8 GB of 32 GB physical memory used; 31.7 GB of 67.2 GB virtual memory used2015-02-19 12:08:13,778 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 19013 for container-id container_1424204221358_0013_01_08: 1.2 MB of 32 GB physical memory used; 103.6 MB of 67.2 GB virtual memory used2015-02-19 12:08:14,455 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1424204221358_0013_01_08 is : 1432015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1424204221358_0013_01_08 transitioned from RUNNING to EXITED_WITH_FAILURE2015-02-19 12:08:14,455 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: Cleaning up container container_1424204221358_0013_01_08 Antony. On Thursday, 19 February 2015, 11:54, Antony Mayi antonym...@yahoo.com.INVALID wrote: it is from within the ALS.trainImplicit() call. btw. the exception varies between this GC overhead limit exceeded and Java heap space (which I guess is just different outcome of same problem). just tried another run and here are the logs (filtered) - note I tried this run with spark.shuffle.io.preferDirectBufs=false so this might be slightly different issue from my previous case (going to revert now): === spark stdout ===15/02/19 10:15:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(6, 192.168.1.92, 54289) with no recent heart beats: 50221ms exceeds 45000ms15/02/19 10:16:05 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(13, 192.168.1.90, 56768) with no recent heart beats: 54749ms exceeds 45000ms15/02/19 10:16:44 ERROR cluster.YarnClientClusterScheduler: Lost executor 6 on 192.168.1.92: remote Akka client disassociated15/02/19 10:16:44 WARN scheduler.TaskSetManager: Lost task 57.0 in stage 18.0 (TID 5379,
Re: No executors allocated on yarn with latest master branch
Are you using the capacity scheduler or fifo scheduler without multi resource scheduling by any chance? On Thu, Feb 12, 2015 at 1:51 PM, Anders Arpteg arp...@spotify.com wrote: The nm logs only seems to contain similar to the following. Nothing else in the same time range. Any help? 2015-02-12 20:47:31,245 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_02 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_12 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_22 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_32 2015-02-12 20:47:31,246 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: KILL_CONTAINER sent to absent container container_1422406067005_0053_01_42 2015-02-12 21:24:30,515 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Event EventType: FINISH_APPLICATION sent to absent application application_1422406067005_0053 On Thu, Feb 12, 2015 at 10:38 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It seems unlikely to me that it would be a 2.2 issue, though not entirely impossible. Are you able to find any of the container logs? Is the NodeManager launching containers and reporting some exit code? -Sandy On Thu, Feb 12, 2015 at 1:21 PM, Anders Arpteg arp...@spotify.com wrote: No, not submitting from windows, from a debian distribution. Had a quick look at the rm logs, and it seems some containers are allocated but then released again for some reason. Not easy to make sense of the logs, but here is a snippet from the logs (from a test in our small test cluster) if you'd like to have a closer look: http://pastebin.com/8WU9ivqC Sandy, sounds like it could possible be a 2.2 issue then, or what do you think? Thanks, Anders On Thu, Feb 12, 2015 at 3:11 PM, Aniket Bhatnagar aniket.bhatna...@gmail.com wrote: This is tricky to debug. Check logs of node and resource manager of YARN to see if you can trace the error. In the past I have to closely look at arguments getting passed to YARN container (they get logged before attempting to launch containers). If I still don't get a clue, I had to check the script generated by YARN to execute the container and even run manually to trace at what line the error has occurred. BTW are you submitting the job from windows? On Thu, Feb 12, 2015, 3:34 PM Anders Arpteg arp...@spotify.com wrote: Interesting to hear that it works for you. Are you using Yarn 2.2 as well? No strange log message during startup, and can't see any other log messages since no executer gets launched. Does not seems to work in yarn-client mode either, failing with the exception below. Exception in thread main org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:119) at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:59) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:141) at org.apache.spark.SparkContext.init(SparkContext.scala:370) at com.spotify.analytics.AnalyticsSparkContext.init(AnalyticsSparkContext.scala:8) at com.spotify.analytics.DataSampler$.main(DataSampler.scala:42) at com.spotify.analytics.DataSampler.main(DataSampler.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:551) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:155) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:178) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:99) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) /Anders On Thu, Feb 12, 2015 at 1:33 AM, Sandy Ryza sandy.r...@cloudera.com wrote: Hi Anders, I just tried this out and was able to
Re: what does Submitting ... missing tasks from Stage mean?
yeah, this is just the totally normal message when spark executes something. The first time something is run, all of its tasks are missing. I would not worry about cases when all tasks aren't missing if you're new to spark, its probably an advanced concept that you don't care about. (and would take me some time to explain :) On Fri, Feb 20, 2015 at 8:20 AM, shahab shahab.mok...@gmail.com wrote: Hi, Probably this is silly question, but I couldn't find any clear documentation explaining why one should submitting... missing tasks from Stage ... in the logs? Specially in my case when I do not have any failure in job execution, I wonder why this should happen? Does it have any relation to lazy evaluation? best, /Shahab
Re: Which OutputCommitter to use for S3?
I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.commailto:m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3Ehttps://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu
Re: Spark Performance on Yarn
Thanks for the suggestions. I'm experimenting with different values for spark memoryOverhead and explictly giving the executors more memory, but still have not found the golden medium to get it to finish in a proper time frame. Is my cluster massively undersized at 5 boxes, 8gb 2cpu ? Trying to figure out a memory setting and executor setting so it runs on many containers in parallel. I'm still struggling as pig jobs and hive jobs on the same whole data set don't take as long. I'm wondering too if the logic in our code is just doing something silly causing multiple reads of all the data. On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which OutputCommitter to use for S3?
We (Databricks) use our own DirectOutputCommitter implementation, which is a couple tens of lines of Scala code. The class would almost entirely be a no-op except we took some care to properly handle the _SUCCESS file. On Fri, Feb 20, 2015 at 3:52 PM, Mingyu Kim m...@palantir.com wrote: I didn’t get any response. It’d be really appreciated if anyone using a special OutputCommitter for S3 can comment on this! Thanks, Mingyu From: Mingyu Kim m...@palantir.com Date: Monday, February 16, 2015 at 1:15 AM To: user@spark.apache.org user@spark.apache.org Subject: Which OutputCommitter to use for S3? HI all, The default OutputCommitter used by RDD, which is FileOutputCommitter, seems to require moving files at the commit step, which is not a constant operation in S3, as discussed in http://mail-archives.apache.org/mod_mbox/spark-user/201410.mbox/%3c543e33fa.2000...@entropy.be%3E https://urldefense.proofpoint.com/v2/url?u=http-3A__mail-2Darchives.apache.org_mod-5Fmbox_spark-2Duser_201410.mbox_-253C543E33FA.2000802-40entropy.be-253Ed=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=2t0BawrpQPkJJgxklG_YX6LFzD1VaHTgDXI-w37smyce=. People seem to develop their own NullOutputCommitter implementation or use DirectFileOutputCommitter (as mentioned in SPARK-3595 https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_SPARK-2D3595d=AwMFAgc=izlc9mHr637UR4lpLEZLFFS3Vn2UXBrZ4tFb6oOnmz8r=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84m=CQfyLCSSjJfOHcbsMrRNihcDeMtHvLkCD5_O0J786BYs=i-gC5iPL8kGUDicLXowgLl5ncIyDknsulTlh7o23W_ge=), but I wanted to check if there is a de facto standard, publicly available OutputCommitter to use for S3 in conjunction with Spark. Thanks, Mingyu
Re: randomSplit instead of a huge map reduce ?
Is there a check you can put in place to not create pairs that aren't in your set of 20M pairs? Additionally, once you have your arrays converted to pairs you can do aggregateByKey with each pair being the key. On Feb 20, 2015 1:57 PM, shlomib shl...@summerhq.com wrote: Hi, I am new to Spark and I think I missed something very basic. I have the following use case (I use Java and run Spark locally on my laptop): I have a JavaRDDString[] - The RDD contains around 72,000 arrays of strings (String[]) - Each array contains 80 words (on average). What I want to do is to convert each array into a new array/list of pairs, for example: Input: String[] words = ['a', 'b', 'c'] Output: List[String, Sting] pairs = [('a', 'b'), (a', 'c'), (b', 'c')] and then I want to count the number of times each pair appeared, so my final output should be something like: Output: List[String, Sting, Integer] result = [('a', 'b', 3), (a', 'c', 8), (b', 'c', 10)] The problem: Since each array contains around 80 words, it returns around 3,200 pairs, so after “mapping” my entire RDD I get 3,200 * 72,000 = *230,400,000* pairs to reduce which require way too much memory. (I know I have only around *20,000,000* unique pairs!) I already modified my code and used 'mapPartitions' instead of 'map'. It definitely improved the performance, but I still feel I'm doing something completely wrong. I was wondering if this is the right 'Spark way' to solve this kind of problem, or maybe I should do something like splitting my original RDD into smaller parts (by using randomSplit), then iterate over each part, aggregate the results into some result RDD (by using 'union') and move on to the next part. Can anyone please explain me which solution is better? Thank you very much, Shlomi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/randomSplit-instead-of-a-huge-map-reduce-tp21744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
About FlumeUtils.createStream
Hi, In the spark streaming application, I write the code, FlumeUtils.createStream(ssc,localhost,),which means spark will listen on the port, and wait for Flume Sink to write to it. My question is: when I submit the application to the Spark Standalone cluster, will be opened only on the Driver Machine or all the workers will also open the port and wait for the Flume data?
Force RDD evaluation
Is there a technique for forcing the evaluation of an RDD? I have used actions to do so but even the most basic count has a non-negligible cost (even on a cached RDD, repeated calls to count take time). My use case is for logging the execution time of the major components in my application. At the end of each component I have a statement like rdd.cache().count() and time how long it takes. Thanks in advance for any advice! Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Force-RDD-evaluation-tp21748.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle Spill
Hello, I have a few tasks in a stage with lots of tasks that have a large amount of shuffle spill. I scouted the web to understand shuffle spill, and I did not find any simple explanation of the spill mechanism. What I put together is: 1. the shuffle spill can happens when the shuffle is written on disk (i.e. by the last map stage, as opposed to when the shuffle is read by the reduce stage) 2. the reason it happens is when it has a lot to write in the shuffle, and since that shuffle needs to be sorted by key, the spilling mechanism allows Spark to do that I am unclear however if a large task will systematically lead to shuffle spill, or if the number of keys (for the next reduce stage) that particular task encounters has also an impact. Concretely: Let's say I have: val ab = RDD[(a,b)] val ac = RDD[(a,c)] val bd = RDD[(b,d)] and I do: val bc = ab.join(ac).values // we investigate this task, triggered by values val cd = bc.join(bd).values The task we investigate reads from a previous shuffle, and will write to another shuffle to prepare for the second join. I know that I have data skew on a key on a, meaning a few tasks are expected to be large and I have stragglers. Now, is that the cause of the shuffle spill, or is it because those straggler tasks also happen to have in their midst a very large amount of distinct bs? Thanks
RE: using a database connection pool to write data into an RDBMS from a Spark application
Sean, I know that Class.forName is not required since Java 1.4 :-) It was just a desperate attempt to make sure that the Postgres driver is getting loaded. Since Class.forName(org.postgresql.Driver) is not throwing an exception, I assume that the driver is available in the classpath. Is that not true? I did some more troubleshooting and here is what I found: 1) The hive libraries used by Spark use BoneCP 0.7.1 2) When Spark master is started, it initializes BoneCP, which will not load any database driver at that point (that makes sense) 3) When my application initializes BoneCP, it thinks it is already initialized and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug is fixed in BoneCP 0.8.0 release. So I linked my app with BoneCP 0.8.0 release, but when I run my app using spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. Thanks, Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 2:06 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed
Re: Spark Streaming and message ordering
You may think as well if your use case really needs a very strict order, because configuring spark that it supports such a strict order means rendering most of benefits useless (failure handling, parallelism etc.). Usually, in a distributed setting you can order events, but this also means that you may need to wait for an unlimited time to be sure that you receive all events to order them. This is impractical, so people implements time outs, which may lead to the case that you loose events etc. The optimal thing would be to partition the data and that there needs to be an order within the partition (across is a different story...). All in all implementing order in Spark depends on your requirements for ordering and depending on this it can be easy or very difficult. You may also consider writing your own framework for mesos or yarn to better meet the requirements and keep your spark cluster config clean (what happens if there are spark jobs not requiring an order? They would be slowed down) So you need to think about: by which criteria can I order events, do I accept loss of events?, do I need a global order over all events or is it only relevant for subsets (partions), what is the impact of not ordering?, what is the impact of loss of events,... Le 20 févr. 2015 18:01, Cody Koeninger c...@koeninger.org a écrit : There is typically some slack between when a batch finishes executing and when the next batch is scheduled. You should be able to arrange your batch sizes / cluster resources to ensure that. If there isn't slack, your overall delay is going to keep increasing indefinitely. If you're inserting into mysql, you're probably going to be much better off doing bulk inserts anyway, and transaction ordering is going to stop a lot of overlap that might otherwise happen. In pseudocode: stream.foreachRdd { rdd = rdd.foreachPartition { iter = bulk = iter.filter(matchEvent).toList transaction { insert bulk } } } You may already know this, but getting jdbc to do true bulk inserts to mysql requires a bit of hoop jumping, so turn on query logging during development to make sure you aren't getting individual inserts. Also be aware that output actions aren't guaranteed to happen exactly once, so you'll need to store unique offset ids in mysql or otherwise deal with the possibility of executor failures. On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote: Thanks for the detailed response Cody. Our use case is to do some external lookups (cached and all) for every event, match the event against the looked up data, decide whether to write an entry in mysql and write it in the order in which the events arrived within a kafka partition. We don't need global ordering. Message ordering within a batch can be achieved either by waiting for 1.3 to be released (the behavior you described works very well for us, within a batch) , or by using updateStateByKey and sorting. speculative execution is turned off as well (I think its off by default). But, from what I see from the JobScheduler/JobGenerator is this. Within each stream, jobs are generated every 'n' milliseconds (batch duration), and submitted for execution. Since job generation in a stream is temporal, its guaranteed that the jobs are submitted in the order of event arrival within a stream. And since we have one stream per kafka partition, this translates to sequentially generated batches sequentially scheduled batches within a kafka partition. But since the *execution* of jobs itself is in parallel, its probable that back-to-back batches in a stream are submitted one after the other , but are executing concurrently. If this understanding of mine is correct, it breaks our requirement that messages be executed in order within a partition. Thanks! On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org wrote: For a given batch, for a given partition, the messages will be processed in order by the executor that is running that partition. That's because messages for the given offset range are pulled by the executor, not pushed from some other receiver. If you have speculative execution, yes, another executor may be running that partition. If your job is lagging behind in processing such that the next batch starts executing before the last batch is finished processing, yes it is possible for some other executor to start working on messages from that same kafka partition. The obvious solution here seems to be turn off speculative execution and adjust your batch interval / sizes such that they can comfortably finish processing :) If your processing time is sufficiently non-linear with regard to the number of messages, yes you might be able to do something with overriding dstream.compute. Unfortunately the new kafka dstream implementation is private, so it's not straightforward to subclass it. I'd like to get a solution in place for people
Re: Spark Streaming and message ordering
Thanks Jorn. Indeed, we do not need global ordering, since our data is partitioned well. We do not need ordering based on wallclock time, that would require waiting indefinitely. All we need is the execution of batches (not job submission) to happen in the same order they are generated, which looks like is not enforced, but more a side effect of how job submission happens as of now. Cody's suggestions are useful to our case, though I need to take a closer look how job executions happen within a stream. Loss of parallelism or failure handling are an issue mainly for global ordering. Global ordering is a much harder problem and relevant only for a small set of use cases, in my opinion. Data is almost always partitioned in some way and any specific ordering behavior is typically constrained within a partition in general. So for us - loss of events is unacceptable, events must be executed in-order within a partition (strictly speaking, 1-1 mapping with kafka partitions) , and our execution logic is idempotent. All of these seem to be possible with 1.3, with some minor tweaks thnx! On Fri, Feb 20, 2015 at 9:24 AM, Jörn Franke jornfra...@gmail.com wrote: You may think as well if your use case really needs a very strict order, because configuring spark that it supports such a strict order means rendering most of benefits useless (failure handling, parallelism etc.). Usually, in a distributed setting you can order events, but this also means that you may need to wait for an unlimited time to be sure that you receive all events to order them. This is impractical, so people implements time outs, which may lead to the case that you loose events etc. The optimal thing would be to partition the data and that there needs to be an order within the partition (across is a different story...). All in all implementing order in Spark depends on your requirements for ordering and depending on this it can be easy or very difficult. You may also consider writing your own framework for mesos or yarn to better meet the requirements and keep your spark cluster config clean (what happens if there are spark jobs not requiring an order? They would be slowed down) So you need to think about: by which criteria can I order events, do I accept loss of events?, do I need a global order over all events or is it only relevant for subsets (partions), what is the impact of not ordering?, what is the impact of loss of events,... Le 20 févr. 2015 18:01, Cody Koeninger c...@koeninger.org a écrit : There is typically some slack between when a batch finishes executing and when the next batch is scheduled. You should be able to arrange your batch sizes / cluster resources to ensure that. If there isn't slack, your overall delay is going to keep increasing indefinitely. If you're inserting into mysql, you're probably going to be much better off doing bulk inserts anyway, and transaction ordering is going to stop a lot of overlap that might otherwise happen. In pseudocode: stream.foreachRdd { rdd = rdd.foreachPartition { iter = bulk = iter.filter(matchEvent).toList transaction { insert bulk } } } You may already know this, but getting jdbc to do true bulk inserts to mysql requires a bit of hoop jumping, so turn on query logging during development to make sure you aren't getting individual inserts. Also be aware that output actions aren't guaranteed to happen exactly once, so you'll need to store unique offset ids in mysql or otherwise deal with the possibility of executor failures. On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote: Thanks for the detailed response Cody. Our use case is to do some external lookups (cached and all) for every event, match the event against the looked up data, decide whether to write an entry in mysql and write it in the order in which the events arrived within a kafka partition. We don't need global ordering. Message ordering within a batch can be achieved either by waiting for 1.3 to be released (the behavior you described works very well for us, within a batch) , or by using updateStateByKey and sorting. speculative execution is turned off as well (I think its off by default). But, from what I see from the JobScheduler/JobGenerator is this. Within each stream, jobs are generated every 'n' milliseconds (batch duration), and submitted for execution. Since job generation in a stream is temporal, its guaranteed that the jobs are submitted in the order of event arrival within a stream. And since we have one stream per kafka partition, this translates to sequentially generated batches sequentially scheduled batches within a kafka partition. But since the *execution* of jobs itself is in parallel, its probable that back-to-back batches in a stream are submitted one after the other , but are executing concurrently. If this understanding of mine is correct, it breaks our requirement that messages be executed
Re: Spark Streaming and message ordering
Thanks for the detailed response Cody. Our use case is to do some external lookups (cached and all) for every event, match the event against the looked up data, decide whether to write an entry in mysql and write it in the order in which the events arrived within a kafka partition. We don't need global ordering. Message ordering within a batch can be achieved either by waiting for 1.3 to be released (the behavior you described works very well for us, within a batch) , or by using updateStateByKey and sorting. speculative execution is turned off as well (I think its off by default). But, from what I see from the JobScheduler/JobGenerator is this. Within each stream, jobs are generated every 'n' milliseconds (batch duration), and submitted for execution. Since job generation in a stream is temporal, its guaranteed that the jobs are submitted in the order of event arrival within a stream. And since we have one stream per kafka partition, this translates to sequentially generated batches sequentially scheduled batches within a kafka partition. But since the *execution* of jobs itself is in parallel, its probable that back-to-back batches in a stream are submitted one after the other , but are executing concurrently. If this understanding of mine is correct, it breaks our requirement that messages be executed in order within a partition. Thanks! On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org wrote: For a given batch, for a given partition, the messages will be processed in order by the executor that is running that partition. That's because messages for the given offset range are pulled by the executor, not pushed from some other receiver. If you have speculative execution, yes, another executor may be running that partition. If your job is lagging behind in processing such that the next batch starts executing before the last batch is finished processing, yes it is possible for some other executor to start working on messages from that same kafka partition. The obvious solution here seems to be turn off speculative execution and adjust your batch interval / sizes such that they can comfortably finish processing :) If your processing time is sufficiently non-linear with regard to the number of messages, yes you might be able to do something with overriding dstream.compute. Unfortunately the new kafka dstream implementation is private, so it's not straightforward to subclass it. I'd like to get a solution in place for people who need to be able to tune the batch generation policy (I need to as well, for unrelated reasons). Maybe you can say a little more about your use case. But regardless of the technology you're using to read from kafka (spark, storm, whatever), kafka only gives you ordering as to a particular partition. So you're going to need to do some kind of downstream sorting if you really care about a global order. On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote: Even with the new direct streams in 1.3, isn't it the case that the job *scheduling* follows the partition order, rather than job *execution*? Or is it the case that the stream listens to job completion event (using a streamlistener) before scheduling the next batch? To compare with storm from a message ordering point of view, unless a tuple is fully processed by the DAG (as defined by spout+bolts), the next tuple does not enter the DAG. On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote: Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there is a 1:1 correspondence between spark partitions and kafka partitions. So you will get deterministic ordering, but only on a per-partition basis. On Thu, Feb 19, 2015 at 11:31 PM, Neelesh neele...@gmail.com wrote: I had a chance to talk to TD today at the Strata+Hadoop Conf in San Jose. We talked a bit about this after his presentation about this - the short answer is spark streaming does not guarantee any sort of ordering (within batches, across batches). One would have to use updateStateByKey to collect the events and sort them based on some attribute of the event. But TD said message ordering is a frequently asked feature recently and is getting on his radar. I went through the source code and there does not seem to be any architectural/design limitation to support this. (JobScheduler, JobGenerator are a good starting point to see how stuff works under the hood). Overriding DStream#compute and using streaminglistener looks like a simple way of ensuring ordered execution of batches within a stream. But this would be a partial solution, since ordering within a batch needs some more work that I don't understand fully
Re: Spark Streaming and message ordering
There is typically some slack between when a batch finishes executing and when the next batch is scheduled. You should be able to arrange your batch sizes / cluster resources to ensure that. If there isn't slack, your overall delay is going to keep increasing indefinitely. If you're inserting into mysql, you're probably going to be much better off doing bulk inserts anyway, and transaction ordering is going to stop a lot of overlap that might otherwise happen. In pseudocode: stream.foreachRdd { rdd = rdd.foreachPartition { iter = bulk = iter.filter(matchEvent).toList transaction { insert bulk } } } You may already know this, but getting jdbc to do true bulk inserts to mysql requires a bit of hoop jumping, so turn on query logging during development to make sure you aren't getting individual inserts. Also be aware that output actions aren't guaranteed to happen exactly once, so you'll need to store unique offset ids in mysql or otherwise deal with the possibility of executor failures. On Fri, Feb 20, 2015 at 10:39 AM, Neelesh neele...@gmail.com wrote: Thanks for the detailed response Cody. Our use case is to do some external lookups (cached and all) for every event, match the event against the looked up data, decide whether to write an entry in mysql and write it in the order in which the events arrived within a kafka partition. We don't need global ordering. Message ordering within a batch can be achieved either by waiting for 1.3 to be released (the behavior you described works very well for us, within a batch) , or by using updateStateByKey and sorting. speculative execution is turned off as well (I think its off by default). But, from what I see from the JobScheduler/JobGenerator is this. Within each stream, jobs are generated every 'n' milliseconds (batch duration), and submitted for execution. Since job generation in a stream is temporal, its guaranteed that the jobs are submitted in the order of event arrival within a stream. And since we have one stream per kafka partition, this translates to sequentially generated batches sequentially scheduled batches within a kafka partition. But since the *execution* of jobs itself is in parallel, its probable that back-to-back batches in a stream are submitted one after the other , but are executing concurrently. If this understanding of mine is correct, it breaks our requirement that messages be executed in order within a partition. Thanks! On Fri, Feb 20, 2015 at 7:03 AM, Cody Koeninger c...@koeninger.org wrote: For a given batch, for a given partition, the messages will be processed in order by the executor that is running that partition. That's because messages for the given offset range are pulled by the executor, not pushed from some other receiver. If you have speculative execution, yes, another executor may be running that partition. If your job is lagging behind in processing such that the next batch starts executing before the last batch is finished processing, yes it is possible for some other executor to start working on messages from that same kafka partition. The obvious solution here seems to be turn off speculative execution and adjust your batch interval / sizes such that they can comfortably finish processing :) If your processing time is sufficiently non-linear with regard to the number of messages, yes you might be able to do something with overriding dstream.compute. Unfortunately the new kafka dstream implementation is private, so it's not straightforward to subclass it. I'd like to get a solution in place for people who need to be able to tune the batch generation policy (I need to as well, for unrelated reasons). Maybe you can say a little more about your use case. But regardless of the technology you're using to read from kafka (spark, storm, whatever), kafka only gives you ordering as to a particular partition. So you're going to need to do some kind of downstream sorting if you really care about a global order. On Fri, Feb 20, 2015 at 1:43 AM, Neelesh neele...@gmail.com wrote: Even with the new direct streams in 1.3, isn't it the case that the job *scheduling* follows the partition order, rather than job *execution*? Or is it the case that the stream listens to job completion event (using a streamlistener) before scheduling the next batch? To compare with storm from a message ordering point of view, unless a tuple is fully processed by the DAG (as defined by spout+bolts), the next tuple does not enter the DAG. On Thu, Feb 19, 2015 at 9:47 PM, Cody Koeninger c...@koeninger.org wrote: Kafka ordering is guaranteed on a per-partition basis. The high-level consumer api as used by the spark kafka streams prior to 1.3 will consume from multiple kafka partitions, thus not giving any ordering guarantees. The experimental direct stream in 1.3 uses the simple consumer api, and there is a 1:1 correspondence between spark
Re: GraphX:java.lang.NoSuchMethodError:org.apache.spark.graphx.Graph$.apply
Has anyone found a solution to this? I was able to reproduce it here http://stackoverflow.com/questions/28576439/getting-nosuchmethoderror-when-setting-up-spark-graphx-graph but I'm unable to resolve it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-java-lang-NoSuchMethodError-org-apache-spark-graphx-Graph-apply-tp19958p21736.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Saving Spark RDD to Avro with spark.api.python.Converter
Hi! I am trying to persist RDD in Avro format with spark API. I wonder if someone has any experience or suggestions. My converter with example can be viewed here https://github.com/daria-sukhareva/spark/commit/2ba7b213572d6ce2056cfc2536b701ae689c7f98 and relevant question here http://stackoverflow.com/questions/28368694/saving-spark-rdd-to-avro-with-spark-api-python-converter . Thanks, Daria -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Spark-RDD-to-Avro-with-spark-api-python-Converter-tp21738.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Performance on Yarn
A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Performance on Yarn
Are you specifying the executor memory, cores, or number of executors anywhere? If not, you won't be taking advantage of the full resources on the cluster. -Sandy On Fri, Feb 20, 2015 at 2:41 AM, Sean Owen so...@cloudera.com wrote: None of this really points to the problem. These indicate that workers died but not why. I'd first go locate executor logs that reveal more about what's happening. It sounds like a hard-er type of failure, like JVM crash or running out of file handles, or GC thrashing. On Fri, Feb 20, 2015 at 4:51 AM, lbierman leebier...@gmail.com wrote: I'm a bit new to Spark, but had a question on performance. I suspect a lot of my issue is due to tuning and parameters. I have a Hive external table on this data and to run queries against it runs in minutes The Job: + 40gb of avro events on HDFS (100 million+ avro events) + Read in the files from HDFS and dedupe events by key (mapToPair then a reduceByKey) + RDD returned and persisted (disk and memory) + Then passed to a job that take the RDD and mapToPair of new object data and then reduceByKey and foreachpartion do work The issue: When I run this on my environment on Yarn this takes 20+ hours. Running on yarn we see the first stage runs to do build the RDD deduped, but then when the next stage starts, things fail and data is lost. This results in stage 0 starting over and over and just dragging it out. Errors I see in the driver logs: ERROR cluster.YarnClientClusterScheduler: Lost executor 1 on X: remote Akka client disassociated 15/02/20 00:27:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 1335,): FetchFailed(BlockManagerId(3, i, 33958), shuffleId=1, mapId=162, reduceId=0, message= org.apache.spark.shuffle.FetchFailedException: Failed to connect toX/X:33958 Also we see this, but I'm suspecting this is because the previous stage fails and the next one starts: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 1 Cluster: 5 machines, each 2 core , 8gb machines Spark-submit command: spark-submit --class com.myco.SparkJob \ --master yarn \ /tmp/sparkjob.jar \ Any thoughts or where to look or how to start approaching this problem or more data points to present. Thanks.. Code for the job: JavaRDDAnalyticsEvent events = ((JavaRDDAvroKeylt;AnalyticsEvent) context.newAPIHadoopRDD( context.hadoopConfiguration(), AvroKeyInputFormat.class, AvroKey.class, NullWritable.class ).keys()) .map(event - AnalyticsEvent.newBuilder(event.datum()).build()) .filter(key - { return Optional.ofNullable(key.getStepEventKey()).isPresent(); }) .mapToPair(event - new Tuple2AnalyticsEvent, Integer(event, 1)) .reduceByKey((analyticsEvent1, analyticsEvent2) - analyticsEvent1) .map(tuple - tuple._1()); events.persist(StorageLevel.MEMORY_AND_DISK_2()); events.mapToPair(event - { return new Tuple2T, RunningAggregates( keySelector.select(event), new RunningAggregates( Optional.ofNullable(event.getVisitors()).orElse(0L), Optional.ofNullable(event.getImpressions()).orElse(0L), Optional.ofNullable(event.getAmount()).orElse(0.0D), Optional.ofNullable(event.getAmountSumOfSquares()).orElse(0.0D))); }) .reduceByKey((left, right) - { return left.add(right); }) .foreachpartition(dostuff) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using a database connection pool to write data into an RDBMS from a Spark application
Have a look at spark.yarn.user.classpath.first and spark.files.userClassPathFirst for a possible way to give your copy of the libs precedence. On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote: Sean, I know that Class.forName is not required since Java 1.4 :-) It was just a desperate attempt to make sure that the Postgres driver is getting loaded. Since Class.forName(org.postgresql.Driver) is not throwing an exception, I assume that the driver is available in the classpath. Is that not true? I did some more troubleshooting and here is what I found: 1) The hive libraries used by Spark use BoneCP 0.7.1 2) When Spark master is started, it initializes BoneCP, which will not load any database driver at that point (that makes sense) 3) When my application initializes BoneCP, it thinks it is already initialized and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug is fixed in BoneCP 0.8.0 release. So I linked my app with BoneCP 0.8.0 release, but when I run my app using spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. Thanks, Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 2:06 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting the number of executors in standalone mode
Hi Mohammed, thanks a lot for the reply. Ok, so from what I understand I cannot control the number of executors per worker in standalone cluster mode. Is that correct? BR On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.com wrote: SPARK_WORKER_MEMORY=8g Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of executors. Mohammed *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com] *Sent:* Friday, February 20, 2015 4:55 AM *To:* user@spark.apache.org *Subject:* Setting the number of executors in standalone mode Hi there, I try to increase the number of executors per worker in the standalone mode and I have failed to achieve that. I followed a bit the instructions of this thread: http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores and did that: spark.executor.memory 1g SPARK_WORKER_MEMORY=8g hoping to get 8 executors per worker but its still 1. And the option num-executors is not available in the standalone mode. Thanks a lot!
Re: Spark Performance on Yarn
If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Setting the number of executors in standalone mode
SPARK_WORKER_MEMORY=8g Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of executors. Mohammed From: Yiannis Gkoufas [mailto:johngou...@gmail.com] Sent: Friday, February 20, 2015 4:55 AM To: user@spark.apache.org Subject: Setting the number of executors in standalone mode Hi there, I try to increase the number of executors per worker in the standalone mode and I have failed to achieve that. I followed a bit the instructions of this thread: http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores and did that: spark.executor.memory 1g SPARK_WORKER_MEMORY=8g hoping to get 8 executors per worker but its still 1. And the option num-executors is not available in the standalone mode. Thanks a lot!
PySpark Cassandra forked
Hi all, Wanted to let you know I've forked PySpark Cassandra on https://github.com/TargetHolding/pyspark-cassandra. Unfortunately the original code didn't work for me and I couldn't figure out how it could work. But it inspired! so I rewrote the majority of the project. The rewrite implements full usage of https://github.com/datastax/spark-cassandra-connector and brings much of it's goodness to PySpark! Hope that some of you are able to put this to good use. And feedback, pull requests, etc. are more than welcome! Best regards, Frens Jan
Stopping a Custom Receiver
Hi, I have a use case for creating a DStream from a single file. I have created a custom receiver that reads the file, calls 'store' with the contents, then calls 'stop'. However, I'm second guessing if this is the correct approach due to the spark logs I see. I always see these logs, and the 'ERROR' and 'WARN' level makes me feel uneasy: 19:27:21,161 ERROR ReceiverTracker:75 - Deregistered receiver for stream 2: Finished reading file from /etc/tercel/PipelineTemplate.json 19:27:21,221 WARN ReceiverSupervisorImpl:71 - Stopped executor without error In some situations (i.e. server instead of laptop), I get this fatal error (spark shuts down): 19:35:08,213 ERROR DAGSchedulerActorSupervisor:96 - eventProcesserActor failed; shutting down SparkContext org.apache.spark.SparkException: Attempted to use BlockRDD[3] at receiverStream at BootstrapMetadata.scala:26 after its blocks have been removed! at org.apache.spark.rdd.BlockRDD.assertValid(BlockRDD.scala:83) at org.apache.spark.rdd.BlockRDD.getPreferredLocations(BlockRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234) at org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:234) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:233) ... FYI, BootstrapMetadata.scala:26 is where I've declared the receiverStream. I am wondering if it is incorrect to call stop within the custom receiver, and if this is the reason why the blocks are removed. Thanks, Nick P.S. This is the receiver implementation: class FileReceiver(path: String) extends Receiver[String](StorageLevel.MEMORY_ONLY) { private var source: BufferedSource = null def onStart() { read() } def onStop(): Unit = { source.close() } private def read() { try { source = Source.fromFile(path) val content = source.getLines().mkString store(content) stop(sFinished reading file from $path) } catch { case e: Exception = stop(sError reading file from $path, e) } } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-a-Custom-Receiver-tp21740.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Setting the number of executors in standalone mode
ASFAIK, in stand-alone mode, each Spark application gets one executor on each worker. You could run multiple workers on a machine though. Mohammed From: Yiannis Gkoufas [mailto:johngou...@gmail.com] Sent: Friday, February 20, 2015 9:48 AM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: Setting the number of executors in standalone mode Hi Mohammed, thanks a lot for the reply. Ok, so from what I understand I cannot control the number of executors per worker in standalone cluster mode. Is that correct? BR On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.commailto:moham...@glassbeam.com wrote: SPARK_WORKER_MEMORY=8g Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of executors. Mohammed From: Yiannis Gkoufas [mailto:johngou...@gmail.commailto:johngou...@gmail.com] Sent: Friday, February 20, 2015 4:55 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Setting the number of executors in standalone mode Hi there, I try to increase the number of executors per worker in the standalone mode and I have failed to achieve that. I followed a bit the instructions of this thread: http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores and did that: spark.executor.memory 1g SPARK_WORKER_MEMORY=8g hoping to get 8 executors per worker but its still 1. And the option num-executors is not available in the standalone mode. Thanks a lot!
RE: using a database connection pool to write data into an RDBMS from a Spark application
SPARK_CLASSPATH has been deprecated since 1.0. In any case, I tired and it didn't work since it appends to the classpath. I need something that prepends to the classpath. Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 10:08 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hm, others can correct me if I'm wrong, but is this what SPARK_CLASSPATH is for? On Fri, Feb 20, 2015 at 6:04 PM, Mohammed Guller moham...@glassbeam.com wrote: It looks like spark.files.userClassPathFirst gives precedence to user libraries only on the worker nodes. Is there something similar to achieve the same behavior on the master? BTW, I am running Spark in stand-alone mode. Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 9:42 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Have a look at spark.yarn.user.classpath.first and spark.files.userClassPathFirst for a possible way to give your copy of the libs precedence. On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote: Sean, I know that Class.forName is not required since Java 1.4 :-) It was just a desperate attempt to make sure that the Postgres driver is getting loaded. Since Class.forName(org.postgresql.Driver) is not throwing an exception, I assume that the driver is available in the classpath. Is that not true? I did some more troubleshooting and here is what I found: 1) The hive libraries used by Spark use BoneCP 0.7.1 2) When Spark master is started, it initializes BoneCP, which will not load any database driver at that point (that makes sense) 3) When my application initializes BoneCP, it thinks it is already initialized and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug is fixed in BoneCP 0.8.0 release. So I linked my app with BoneCP 0.8.0 release, but when I run my app using spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. Thanks, Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 2:06 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection
Spark 1.3 SQL Programming Guide and sql._ / sql.types._
Quickly reviewing the latest SQL Programming Guide https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md (in github) I had a couple of quick questions: 1) Do we need to instantiate the SparkContext as per // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) Within Spark 1.3 the sqlContext is already available so probably do not need to make this call. 2) Importing org.apache.spark.sql._ should bring in both SQL data types, struct types, and row // Import Spark SQL data types and Row. import org.apache.spark.sql._ Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only brings in row. scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) console:25: error: not found: value StructType StructType( But if I also import in org.apache.spark.sql.types_ scala import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(DeviceMake,StringType,true), StructField(Country,StringType,true)) Wondering if this is by design or perhaps a quick documentation / package update is warranted.
RE: using a database connection pool to write data into an RDBMS from a Spark application
It looks like spark.files.userClassPathFirst gives precedence to user libraries only on the worker nodes. Is there something similar to achieve the same behavior on the master? BTW, I am running Spark in stand-alone mode. Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 9:42 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Have a look at spark.yarn.user.classpath.first and spark.files.userClassPathFirst for a possible way to give your copy of the libs precedence. On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote: Sean, I know that Class.forName is not required since Java 1.4 :-) It was just a desperate attempt to make sure that the Postgres driver is getting loaded. Since Class.forName(org.postgresql.Driver) is not throwing an exception, I assume that the driver is available in the classpath. Is that not true? I did some more troubleshooting and here is what I found: 1) The hive libraries used by Spark use BoneCP 0.7.1 2) When Spark master is started, it initializes BoneCP, which will not load any database driver at that point (that makes sense) 3) When my application initializes BoneCP, it thinks it is already initialized and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug is fixed in BoneCP 0.8.0 release. So I linked my app with BoneCP 0.8.0 release, but when I run my app using spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. Thanks, Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 2:06 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed
Re: using a database connection pool to write data into an RDBMS from a Spark application
Hm, others can correct me if I'm wrong, but is this what SPARK_CLASSPATH is for? On Fri, Feb 20, 2015 at 6:04 PM, Mohammed Guller moham...@glassbeam.com wrote: It looks like spark.files.userClassPathFirst gives precedence to user libraries only on the worker nodes. Is there something similar to achieve the same behavior on the master? BTW, I am running Spark in stand-alone mode. Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 9:42 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Have a look at spark.yarn.user.classpath.first and spark.files.userClassPathFirst for a possible way to give your copy of the libs precedence. On Fri, Feb 20, 2015 at 5:20 PM, Mohammed Guller moham...@glassbeam.com wrote: Sean, I know that Class.forName is not required since Java 1.4 :-) It was just a desperate attempt to make sure that the Postgres driver is getting loaded. Since Class.forName(org.postgresql.Driver) is not throwing an exception, I assume that the driver is available in the classpath. Is that not true? I did some more troubleshooting and here is what I found: 1) The hive libraries used by Spark use BoneCP 0.7.1 2) When Spark master is started, it initializes BoneCP, which will not load any database driver at that point (that makes sense) 3) When my application initializes BoneCP, it thinks it is already initialized and does not load the Postgres driver ( this is a known bug in 0.7.1). This bug is fixed in BoneCP 0.8.0 release. So I linked my app with BoneCP 0.8.0 release, but when I run my app using spark-submit, Spark continues to use BoneCP 0.7.1. How do I override that behavior? How do I make spark-submit script unload BoneCP 0.7.1 and load BoneCP 0.8.0? I tried the --jars and --driver-classpath flags, but it didn't help. Thanks, Mohammed -Original Message- From: Sean Owen [mailto:so...@cloudera.com] Sent: Friday, February 20, 2015 2:06 AM To: Mohammed Guller Cc: Kelvin Chu; user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Although I don't know if it's related, the Class.forName() method of loading drivers is very old. You should be using DataSource and javax.sql; this has been the usual practice since about Java 1.4. Why do you say a different driver is being loaded? that's not the error here. Try instantiating the driver directly to test whether it's available in the classpath. Otherwise you would have to check whether the jar exists, the class exists in it, and it's really on your classpath. On Fri, Feb 20, 2015 at 5:27 AM, Mohammed Guller moham...@glassbeam.com wrote: Hi Kelvin, Yes. I am creating an uber jar with the Postgres driver included, but nevertheless tried both –jars and –driver-classpath flags. It didn’t help. Interestingly, I can’t use BoneCP even in the driver program when I run my application with spark-submit. I am getting the same exception when the application initializes BoneCP before creating SparkContext. It looks like Spark is loading a different version of the Postgres JDBC driver than the one that I am linking. Mohammed From: Kelvin Chu [mailto:2dot7kel...@gmail.com] Sent: Thursday, February 19, 2015 7:56 PM To: Mohammed Guller Cc: user@spark.apache.org Subject: Re: using a database connection pool to write data into an RDBMS from a Spark application Hi Mohammed, Did you use --jars to specify your jdbc driver when you submitted your job? Take a look of this link: http://spark.apache.org/docs/1.2.0/submitting-applications.html Hope this help! Kelvin On Thu, Feb 19, 2015 at 7:24 PM, Mohammed Guller moham...@glassbeam.com wrote: Hi – I am trying to use BoneCP (a database connection pooling library) to write data from my Spark application to an RDBMS. The database inserts are inside a foreachPartition code block. I am getting this exception when the code tries to insert data using BoneCP: java.sql.SQLException: No suitable driver found for jdbc:postgresql://hostname:5432/dbname I tried explicitly loading the Postgres driver on the worker nodes by adding the following line inside the foreachPartition code block: Class.forName(org.postgresql.Driver) It didn’t help. Has anybody able to get a database connection pool library to work with Spark? If you got it working, can you please share the steps? Thanks, Mohammed - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Setting the number of executors in standalone mode
Hi, Currently, there is only one executor per worker. There is jira ticket to relax this: https://issues.apache.org/jira/browse/SPARK-1706 But, if you want to use more cores, maybe, you can try increasing SPARK_WORKER_INSTANCES. It increases the number of workers per machine. Take a look here: http://spark.apache.org/docs/1.2.0/spark-standalone.html Hope this help! Kelvin On Fri, Feb 20, 2015 at 10:08 AM, Mohammed Guller moham...@glassbeam.com wrote: ASFAIK, in stand-alone mode, each Spark application gets one executor on each worker. You could run multiple workers on a machine though. Mohammed *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com] *Sent:* Friday, February 20, 2015 9:48 AM *To:* Mohammed Guller *Cc:* user@spark.apache.org *Subject:* Re: Setting the number of executors in standalone mode Hi Mohammed, thanks a lot for the reply. Ok, so from what I understand I cannot control the number of executors per worker in standalone cluster mode. Is that correct? BR On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.com wrote: SPARK_WORKER_MEMORY=8g Will allocate 8GB memory to Spark on each worker node. Nothing to do with # of executors. Mohammed *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com] *Sent:* Friday, February 20, 2015 4:55 AM *To:* user@spark.apache.org *Subject:* Setting the number of executors in standalone mode Hi there, I try to increase the number of executors per worker in the standalone mode and I have failed to achieve that. I followed a bit the instructions of this thread: http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores and did that: spark.executor.memory 1g SPARK_WORKER_MEMORY=8g hoping to get 8 executors per worker but its still 1. And the option num-executors is not available in the standalone mode. Thanks a lot!
Re: output worker stdout to one place
Hi Anny, You could play with creating your own log4j.properties that will write the output somewhere else (e.g. to some remote mount, or remote syslog). Sorry, but I don't have an example handy. Alternatively, if you can use Yarn, it will collect all logs after the job is finished and make them available as a single file using the yarn logs command. On Fri, Feb 20, 2015 at 11:31 AM, anny9699 anny9...@gmail.com wrote: Hi, I am wondering if there's some way that could lead some of the worker stdout to one place instead of in each worker's stdout. For example, I have the following code RDD.foreach{line = try{ do something }catch{ case e:exception = println(line) } } Every time I want to check what's causing the exception, I have to check one worker after another in the UI, because I don't know which worker will be dealing with the exception case. Is there a way that the println could print to one place instead of separate worker stdout so that I only need to check one place? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
output worker stdout to one place
Hi, I am wondering if there's some way that could lead some of the worker stdout to one place instead of in each worker's stdout. For example, I have the following code RDD.foreach{line = try{ do something }catch{ case e:exception = println(line) } } Every time I want to check what's causing the exception, I have to check one worker after another in the UI, because I don't know which worker will be dealing with the exception case. Is there a way that the println could print to one place instead of separate worker stdout so that I only need to check one place? Thanks a lot! Anny -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/output-worker-stdout-to-one-place-tp21742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Use Spark Streaming for Batch?
We have a sophisticated Spark Streaming application that we have been using successfully in production for over a year to process a time series of events. Our application makes novel use of updateStateByKey() for state management. We now have the need to perform exactly the same processing on input data that's not real-time, but has been persisted to disk. We do not want to rewrite our Spark Streaming app unless we have to. /Might it be possible to perform large batches processing on HDFS time series data using Spark Streaming?/ 1.I understand that there is not currently an InputDStream that could do what's needed. I would have to create such a thing. 2. Time is a problem. I would have to use the timestamps on our events for any time-based logic and state management 3. The batch duration would become meaningless in this scenario. Could I just set it to something really small (say 1 second) and then let it fall behind, processing the data as quickly as it could? It all seems possible. But could Spark Streaming work this way? If I created a DStream that delivered (say) months of events, could Spark Streaming effectively process this in a batch fashion? Any and all comments/ideas welcome! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Use-Spark-Streaming-for-Batch-tp21745.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
using hivecontext with sparksql on cdh 5.3
I am trying to access a hive table using spark sql but I am having trouble. I followed the instructions in a cloudera community board which stated 1) Import hive jars into the class path export SPARK_CLASSPATH=$(find /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name '*.jar' -print0 | sed 's/\x0/:/g') 2) start the spark shell spark-shell 3) created a hive context val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 4) then run query sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) When I do this it seems that it cannot find the table in the hive metastore, I have put all of my cloudera parcels in the partition starting with /data as opposed to the default location used by cloudera. Any suggestions on what can be done? I am putting the error below 15/02/20 13:43:01 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at com.sun.proxy.$Proxy24.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy25.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at
Re: Spark Performance on Yarn
Hi Sandy, I am also doing memory tuning on YARN. Just want to confirm, is it correct to say: spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I can actually use in my jvm application If it is not, what is the correct relationship? Any other variables or config parameters in play? Thanks. Kelvin On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Performance on Yarn
Hi Kelvin, spark.executor.memory controls the size of the executor heaps. spark.yarn.executor.memoryOverhead is the amount of memory to request from YARN beyond the heap size. This accounts for the fact that JVMs use some non-heap memory. The Spark heap is divided into spark.storage.memoryFraction (default 0.6) and spark.shuffle.memoryFraction (default 0.2), and the rest is for basic Spark bookkeeping and anything the user does inside UDFs. -Sandy On Fri, Feb 20, 2015 at 11:44 AM, Kelvin Chu 2dot7kel...@gmail.com wrote: Hi Sandy, I am also doing memory tuning on YARN. Just want to confirm, is it correct to say: spark.executor.memory - spark.yarn.executor.memoryOverhead = the memory I can actually use in my jvm application If it is not, what is the correct relationship? Any other variables or config parameters in play? Thanks. Kelvin On Fri, Feb 20, 2015 at 9:45 AM, Sandy Ryza sandy.r...@cloudera.com wrote: If that's the error you're hitting, the fix is to boost spark.yarn.executor.memoryOverhead, which will put some extra room in between the executor heap sizes and the amount of memory requested for them from YARN. -Sandy On Fri, Feb 20, 2015 at 9:40 AM, lbierman leebier...@gmail.com wrote: A bit more context on this issue. From the container logs on the executor Given my cluster specs above what would be appropriate parameters to pass into : --num-executors --num-cores --executor-memory I had tried it with --executor-memory 2500MB 015-02-20 06:50:09,056 WARN org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Container [pid=23320,containerID=container_1423083596644_0238_01_004160] is running beyond physical memory limits. Current usage: 2.8 GB of 2.7 GB physical memory used; 4.4 GB of 5.8 GB virtual memory used. Killing container. Dump of the process-tree for container_1423083596644_0238_01_004160 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 23320 23318 23320 23320 (bash) 0 0 108650496 305 /bin/bash -c /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal :42535/user/CoarseGrainedScheduler 8 ip-10-99-162-56.ec2.internal 1 application_1423083596644_0238 1 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stdout 2 /var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160/stderr |- 23323 23320 23320 23320 (java) 922271 12263 461976 724218 /usr/java/latest/bin/java -server -XX:OnOutOfMemoryError=kill %p -Xms2400m -Xmx2400m -Djava.io.tmpdir=/dfs/yarn/nm/usercache/root/appcache/application_1423083596644_0238/container_1423083596644_0238_01_004160/tmp -Dspark.yarn.app.container.log.dir=/var/log/hadoop-yarn/container/application_1423083596644_0238/container_1423083596644_0238_01_004160 org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://sparkDriver@ip-10-168-86-13.ec2.internal:42535/user/Coarse -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Performance-on-Yarn-tp21729p21739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: using hivecontext with sparksql on cdh 5.3
Chirag, This worked for us: spark-submit --master yarn-cluster --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' ... Let me know, if you have any issues. On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com wrote: I am trying to access a hive table using spark sql but I am having trouble. I followed the instructions in a cloudera community board which stated 1) Import hive jars into the class path export SPARK_CLASSPATH=$(find /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name '*.jar' -print0 | sed 's/\x0/:/g') 2) start the spark shell spark-shell 3) created a hive context val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 4) then run query sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) When I do this it seems that it cannot find the table in the hive metastore, I have put all of my cloudera parcels in the partition starting with /data as opposed to the default location used by cloudera. Any suggestions on what can be done? I am putting the error below 15/02/20 13:43:01 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at com.sun.proxy.$Proxy24.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy25.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:191) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:147) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at
Re: Spark 1.3 SQL Programming Guide and sql._ / sql.types._
Yeah, sorry. The programming guide has not been updated for 1.3. I'm hoping to get to that this weekend / next week. On Fri, Feb 20, 2015 at 9:55 AM, Denny Lee denny.g@gmail.com wrote: Quickly reviewing the latest SQL Programming Guide https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md (in github) I had a couple of quick questions: 1) Do we need to instantiate the SparkContext as per // sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) Within Spark 1.3 the sqlContext is already available so probably do not need to make this call. 2) Importing org.apache.spark.sql._ should bring in both SQL data types, struct types, and row // Import Spark SQL data types and Row. import org.apache.spark.sql._ Currently with Spark 1.3 RC1, it appears org.apache.spark.sql._ only brings in row. scala import org.apache.spark.sql._ import org.apache.spark.sql._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) console:25: error: not found: value StructType StructType( But if I also import in org.apache.spark.sql.types_ scala import org.apache.spark.sql.types._ import org.apache.spark.sql.types._ scala val schema = | StructType( | schemaString.split( ).map(fieldName = StructField(fieldName, StringType, true))) schema: org.apache.spark.sql.types.StructType = StructType(StructField(DeviceMake,StringType,true), StructField(Country,StringType,true)) Wondering if this is by design or perhaps a quick documentation / package update is warranted.
Re: using hivecontext with sparksql on cdh 5.3
I tried spark-shell --master yarn-cluster --driver-class-path '/data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/*' and I get the following error Error: Cluster deploy mode is not applicable to Spark shells. Run with --help for usage help or --verbose for debug output On Fri, Feb 20, 2015 at 2:52 PM, Sourigna Phetsarath gna.phetsar...@teamaol.com wrote: Chirag, This worked for us: spark-submit --master yarn-cluster --driver-class-path '/opt/cloudera/parcels/CDH/lib/hive/lib/*' --driver-java-options '-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hive/lib/*' ... Let me know, if you have any issues. On Fri, Feb 20, 2015 at 2:43 PM, chirag lakhani chirag.lakh...@gmail.com wrote: I am trying to access a hive table using spark sql but I am having trouble. I followed the instructions in a cloudera community board which stated 1) Import hive jars into the class path export SPARK_CLASSPATH=$(find /data/opt/cloudera/parcels/CDH-5.3.1-1.cdh5.3.1.p0.5/lib/hive/lib/ -name '*.jar' -print0 | sed 's/\x0/:/g') 2) start the spark shell spark-shell 3) created a hive context val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) 4) then run query sqlContext.sql(FROM analytics.trainingdatafinal SELECT *).collect().foreach(println) When I do this it seems that it cannot find the table in the hive metastore, I have put all of my cloudera parcels in the partition starting with /data as opposed to the default location used by cloudera. Any suggestions on what can be done? I am putting the error below 15/02/20 13:43:01 ERROR Hive: NoSuchObjectException(message:analytics.trainingdatafinal table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1569) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:106) at com.sun.proxy.$Proxy24.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:1008) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:90) at com.sun.proxy.$Proxy25.getTable(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:1000) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:974) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:70) at org.apache.spark.sql.hive.HiveContext$$anon$2.org $apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:141) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:141) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:253) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:143) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$5.applyOrElse(Analyzer.scala:138) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:162) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at