Re: java.io.IOException: Filesystem closed
Sorry for the delayed response. Please find my application attached. On Tue, Dec 2, 2014 at 12:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks SimpleApp001.scala Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: Filesystem closed
Your code seems to have a lot of threads and i think you might be invoking sc.stop before those threads get finished. Thanks Best Regards On Tue, Dec 2, 2014 at 12:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks
Re: java.io.IOException: Filesystem closed
But, somehow, if I run this application for the second time, I find that the application gets executed and the results are out regardless of the same errors in logs. On Tue, Dec 2, 2014 at 2:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Your code seems to have a lot of threads and i think you might be invoking sc.stop before those threads get finished. Thanks Best Regards On Tue, Dec 2, 2014 at 12:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks
Re: java.io.IOException: Filesystem closed
It could be because those threads are finishing quickly. Thanks Best Regards On Tue, Dec 2, 2014 at 2:19 PM, rapelly kartheek kartheek.m...@gmail.com wrote: But, somehow, if I run this application for the second time, I find that the application gets executed and the results are out regardless of the same errors in logs. On Tue, Dec 2, 2014 at 2:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Your code seems to have a lot of threads and i think you might be invoking sc.stop before those threads get finished. Thanks Best Regards On Tue, Dec 2, 2014 at 12:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks
Re: java.io.IOException: Filesystem closed
Does the sparkContext shuts down itself by default even if I dont mention specifically in my code?? Because, I ran the application without sc.context(), still I get file system closed error along with correct output. On Tue, Dec 2, 2014 at 2:20 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It could be because those threads are finishing quickly. Thanks Best Regards On Tue, Dec 2, 2014 at 2:19 PM, rapelly kartheek kartheek.m...@gmail.com wrote: But, somehow, if I run this application for the second time, I find that the application gets executed and the results are out regardless of the same errors in logs. On Tue, Dec 2, 2014 at 2:08 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Your code seems to have a lot of threads and i think you might be invoking sc.stop before those threads get finished. Thanks Best Regards On Tue, Dec 2, 2014 at 12:04 PM, Akhil Das ak...@sigmoidanalytics.com wrote: What is the application that you are submitting? Looks like you might have invoked fs inside the app and then closed it within it. Thanks Best Regards On Tue, Dec 2, 2014 at 11:59 AM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I face the following exception when submit a spark application. The log file shows: 14/12/02 11:52:58 ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:689) at org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:1668) at org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:1629) at org.apache.hadoop.hdfs.DFSOutputStream.sync(DFSOutputStream.java:1614) at org.apache.hadoop.fs.FSDataOutputStream.sync(FSDataOutputStream.java:120) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at org.apache.spark.util.FileLogger$$anonfun$flush$2.apply(FileLogger.scala:158) at scala.Option.foreach(Option.scala:236) at org.apache.spark.util.FileLogger.flush(FileLogger.scala:158) at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:87) at org.apache.spark.scheduler.EventLoggingListener.onJobEnd(EventLoggingListener.scala:112) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$postToAll$4.apply(SparkListenerBus.scala:52) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:81) at org.apache.spark.scheduler.SparkListenerBus$$anonfun$foreachListener$1.apply(SparkListenerBus.scala:79) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.SparkListenerBus$class.foreachListener(SparkListenerBus.scala:79) at org.apache.spark.scheduler.SparkListenerBus$class.postToAll(SparkListenerBus.scala:52) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:32) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:56) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:56) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply(LiveListenerBus.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:46) Someone please help me resolve this!! Thanks
Re: Is Spark the right tool for me?
The point 4 looks weird to me, I mean if you intent to have such workflow to run in a single session (maybe consider sessionless arch) Is such process for each user? If it's the case, maybe finding a way to do it for all at once would be better (more data but less scheduling). For the micro updates, considering something like a queue (kestrel? or even kafk... whatever, something that works) would be great. So you remove the load off the instances, and the updates can be done at its own pace. Also, you can reuse it to notify the WMS. Isn't there a way to do tiling directly? Also, do you need indexes, I mean do you need the full OGIS power, or just some classical operators are enough (using BBox only for instance)? The more you can simplify the better :-D. These are only my2c, it's hard to think or react appropriately without knowing the whole context. BTW, to answer your very first question: yes, it looks like Spark will help you! cheers, andy On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Yes, the processing causes the most stress. But this is parallizeable by splitting the input source. My problem is that once the heavy preprocessing is done, I’m in a „micro-update“ mode so to say (user-interactive part of the whole workflow). Then the map is rendered directly from the SQLite file by the map server instance on that machine – this is actually a favorable setup for me for resource consumption and implementation costs (I just need to tell the web ui to refresh after something was written to the db, and the map server will render the updates without me changing / coding anything). So my workflow requires to break out of parallel processing for some time. Do you think for my my generalized workflow and tool chain can be like so? 1. Pre-Process many files in a parallel way. Gather all results, deploy them on one single machine. = Spark coalesce() + Crunch (for splitting input files into separate tasks) 2. On the machine where preprocessed results are on, configure a map server to connect to the local SQLite source. Do user-interactive micro-updates on that file (web UI gets updated). 3. Post-process the files in parallel. = Spark + Crunch 4. Design all of the above as a workflow, runnable (or assignable) as part of a user session. = Oozie Do you think this is ok? ~Ben Von: andy petrella andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:48 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, user@spark.apache.org user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Indeed. However, I guess the important load and stress is in the processing of the 3D data (DEM or alike) into geometries/shades/whatever. Hence you can use spark (geotrellis can be tricky for 3D, poke @lossyrob for more info) to perform these operations then keep an RDD of only the resulting geometries. Those geometries won't probably that heavy, hence it might be possible to coalesce(1, true) to have to whole thing on one node (or if your driver is more beefy, do a collect/foreach) to create the index. You could also create a GeoJSON of the geometries and create the r-tree on it (not sure about this one). On Mon Dec 01 2014 at 3:38:00 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Thank you for mentioning GeoTrellis. I haven’t heard of this before. We have many custom tools and steps, I’ll check our tools fit in. The end result after is actually a 3D map for native OpenGL based rendering on iOS / Android [1]. I’m using GeoPackage which is basically SQLite with R-Tree and a small library around it (more lightweight than SpatialLite). I want to avoid accessing the SQLite db from any other machine or task, that’s where I thought I can use a long running task which is the only process responsible to update a local-only stored SQLite db file. As you also said SQLite (or mostly any other file based db) won’t work well over network. This isn’t only limited to R-Tree but expected limitation because of file locking issues as documented also by SQLite. I also thought to do the same thing when rendering the (web) maps. In combination with the db handler which does the actual changes, I thought to run a map server instance on each node, configure it to add the database location as map source once the task starts. Cheers Ben [1] http://www.deep-map.com Von: andy petrella andy.petre...@gmail.com Datum: Montag, 1. Dezember 2014 15:07 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, user@spark.apache.org user@spark.apache.org Betreff: Re: Is Spark the right tool for me? Not quite sure which geo processing you're doing are they raster, vector? More info will be appreciated for me to help you further. Meanwhile I can try to give some hints, for instance, did you considered GeoMesa http://www.geomesa.org/2014/08/05/spark/? Since you need a
Is it possible to just change the value of the items in RDD without making a full copy?
Hi, I'd like to make an operation on an RDD that ONLY change the value of some items, without make a full copy or full scan of each data. It is useful when I need to handle a large RDD, and each time I need only to change a little fraction of the data, and keeps other data unchanged. Certainly I don't want to make a full copy the data to the new RDD. For example, suppose I have a RDD that contains integer data from 0 to 100. What I want is to make the first element of the RDD changed from 0 to 1, other elements untouched. I tried this, but it doesn't work: var rdd = parallelize(Range(0,100)) rdd.mapPartitions({iter= iter(0) = 1}) The reported error is : value update is not a member of Iterator[Int] Anyone knows how to make it work?
Re: Is Spark the right tool for me?
To be precise I want the workflow to be associated to a user, but it doesn’t need to be run as part of or depend on a session. I can’t run scheduled jobs, because a user can potentially upload hundreds of files which trigger a long running batch import / update process but he could also make a very small upload / update and immediately wants to continue to work on the (temporary) data that he just uploaded. So that same workflow duration may vary between some seconds, a minute and hours, completely depending on the project's size. So a user can log off and on again to the web site and the initial upload + conversion step may either be still running or finished. He’ll see the progress on the web site, and once the initial processing is done he can continue with the next step of the import workflow, he can interactively change some stuff on that temporary data. After he is done changing stuff, he can hit a „continue“ button which triggers again a long or short running post-processing pipe. Then the user can make a final review of that now post-processed data, and after hitting a „save“ button a final commits pipe pushes / merges the until now temporary data to some persistent store. You’re completely right about that I should simplify as much as possible. Finding the right mix seems key. I’ve also considered to use Kafka to message between Web UI and the pipes, I think it will fit. Chaining the pipes together as a workflow and implementing, managing and monitoring these long running user tasks with locality as I need them is still causing me headache. Btw, the tiling and indexing is not a problem. My propblem is mainly in parallelized conversion, polygon creation, cleaning of CAD file data (e.g. GRASS, prepair, custom tools). After all parts have been preprocessed and gathered in one place, the initial creation of the preview geo file is taking a fraction of the time (inserting all data in one transaction, taking somewhere between sub-second and 10 seconds for very large projects). It’s currently not a concern. (searching for a Kafka+Spark example now) Cheers Ben Von: andy petrella andy.petre...@gmail.commailto:andy.petre...@gmail.com Datum: Dienstag, 2. Dezember 2014 10:00 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Betreff: Re: Is Spark the right tool for me? The point 4 looks weird to me, I mean if you intent to have such workflow to run in a single session (maybe consider sessionless arch) Is such process for each user? If it's the case, maybe finding a way to do it for all at once would be better (more data but less scheduling). For the micro updates, considering something like a queue (kestrel? or even kafk... whatever, something that works) would be great. So you remove the load off the instances, and the updates can be done at its own pace. Also, you can reuse it to notify the WMS. Isn't there a way to do tiling directly? Also, do you need indexes, I mean do you need the full OGIS power, or just some classical operators are enough (using BBox only for instance)? The more you can simplify the better :-D. These are only my2c, it's hard to think or react appropriately without knowing the whole context. BTW, to answer your very first question: yes, it looks like Spark will help you! cheers, andy On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.commailto:benjamin.sta...@heidelberg-mobil.com wrote: Yes, the processing causes the most stress. But this is parallizeable by splitting the input source. My problem is that once the heavy preprocessing is done, I’m in a „micro-update“ mode so to say (user-interactive part of the whole workflow). Then the map is rendered directly from the SQLite file by the map server instance on that machine – this is actually a favorable setup for me for resource consumption and implementation costs (I just need to tell the web ui to refresh after something was written to the db, and the map server will render the updates without me changing / coding anything). So my workflow requires to break out of parallel processing for some time. Do you think for my my generalized workflow and tool chain can be like so? 1. Pre-Process many files in a parallel way. Gather all results, deploy them on one single machine. = Spark coalesce() + Crunch (for splitting input files into separate tasks) 2. On the machine where preprocessed results are on, configure a map server to connect to the local SQLite source. Do user-interactive micro-updates on that file (web UI gets updated). 3. Post-process the files in parallel. = Spark + Crunch 4. Design all of the above as a workflow, runnable (or assignable) as part of a user session. = Oozie Do you think this is ok? ~Ben Von: andy petrella
Re: Is Spark the right tool for me?
You might also have to check the Spark JobServer, it could help you at some point. On Tue Dec 02 2014 at 12:29:01 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: To be precise I want the workflow to be associated to a user, but it doesn’t need to be run as part of or depend on a session. I can’t run scheduled jobs, because a user can potentially upload hundreds of files which trigger a long running batch import / update process but he could also make a very small upload / update and immediately wants to continue to work on the (temporary) data that he just uploaded. So that same workflow duration may vary between some seconds, a minute and hours, completely depending on the project's size. So a user can log off and on again to the web site and the initial upload + conversion step may either be still running or finished. He’ll see the progress on the web site, and once the initial processing is done he can continue with the next step of the import workflow, he can interactively change some stuff on that temporary data. After he is done changing stuff, he can hit a „continue“ button which triggers again a long or short running post-processing pipe. Then the user can make a final review of that now post-processed data, and after hitting a „save“ button a final commits pipe pushes / merges the until now temporary data to some persistent store. You’re completely right about that I should simplify as much as possible. Finding the right mix seems key. I’ve also considered to use Kafka to message between Web UI and the pipes, I think it will fit. Chaining the pipes together as a workflow and implementing, managing and monitoring these long running user tasks with locality as I need them is still causing me headache. Btw, the tiling and indexing is not a problem. My propblem is mainly in parallelized conversion, polygon creation, cleaning of CAD file data (e.g. GRASS, prepair, custom tools). After all parts have been preprocessed and gathered in one place, the initial creation of the preview geo file is taking a fraction of the time (inserting all data in one transaction, taking somewhere between sub-second and 10 seconds for very large projects). It’s currently not a concern. (searching for a Kafka+Spark example now) Cheers Ben Von: andy petrella andy.petre...@gmail.com Datum: Dienstag, 2. Dezember 2014 10:00 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, user@spark.apache.org user@spark.apache.org Betreff: Re: Is Spark the right tool for me? The point 4 looks weird to me, I mean if you intent to have such workflow to run in a single session (maybe consider sessionless arch) Is such process for each user? If it's the case, maybe finding a way to do it for all at once would be better (more data but less scheduling). For the micro updates, considering something like a queue (kestrel? or even kafk... whatever, something that works) would be great. So you remove the load off the instances, and the updates can be done at its own pace. Also, you can reuse it to notify the WMS. Isn't there a way to do tiling directly? Also, do you need indexes, I mean do you need the full OGIS power, or just some classical operators are enough (using BBox only for instance)? The more you can simplify the better :-D. These are only my2c, it's hard to think or react appropriately without knowing the whole context. BTW, to answer your very first question: yes, it looks like Spark will help you! cheers, andy On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Yes, the processing causes the most stress. But this is parallizeable by splitting the input source. My problem is that once the heavy preprocessing is done, I’m in a „micro-update“ mode so to say (user-interactive part of the whole workflow). Then the map is rendered directly from the SQLite file by the map server instance on that machine – this is actually a favorable setup for me for resource consumption and implementation costs (I just need to tell the web ui to refresh after something was written to the db, and the map server will render the updates without me changing / coding anything). So my workflow requires to break out of parallel processing for some time. Do you think for my my generalized workflow and tool chain can be like so? 1. Pre-Process many files in a parallel way. Gather all results, deploy them on one single machine. = Spark coalesce() + Crunch (for splitting input files into separate tasks) 2. On the machine where preprocessed results are on, configure a map server to connect to the local SQLite source. Do user-interactive micro-updates on that file (web UI gets updated). 3. Post-process the files in parallel. = Spark + Crunch 4. Design all of the above as a workflow, runnable (or assignable) as part of a user session. = Oozie Do you think this is ok? ~Ben
Re: Is it possible to just change the value of the items in RDD without making a full copy?
RDDs are immutable, so if you want to change the value of an RDD then you have to create another RDD from it by applying some transformation. Not sure if this is what you are looking for: val rdd = sc.parallelize(Range(0,100)) val rdd2 = rdd.map(x = { println(Value : + x) var ret = 1 if(x != 0) ret = x ret }) rdd2.collect() Thanks Best Regards On Tue, Dec 2, 2014 at 4:48 PM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, I'd like to make an operation on an RDD that *ONLY *change the value of some items, without make a full copy or full scan of each data. It is useful when I need to handle a large RDD, and each time I need only to change a little fraction of the data, and keeps other data unchanged. Certainly I don't want to make a full copy the data to the new RDD. For example, suppose I have a RDD that contains integer data from 0 to 100. What I want is to make the first element of the RDD changed from 0 to 1, other elements untouched. I tried this, but it doesn't work: var rdd = parallelize(Range(0,100)) rdd.mapPartitions({iter= iter(0) = 1}) The reported error is : value update is not a member of Iterator[Int] Anyone knows how to make it work?
Re: Is it possible to just change the value of the items in RDD without making a full copy?
You can not modify one RDD in mapPartitions due to RDD is immutable. Once you apply transform function on RDDs, they will produce new RDDs. If you just want to modify only a fraction of the total RDD, try to collect the new value list to driver or use broadcast variable after each iteration, not to update RDD. It's similar with SGD in mllib. 2014-12-02 19:18 GMT+08:00 Xuelin Cao xuelin...@yahoo.com.invalid: Hi, I'd like to make an operation on an RDD that *ONLY *change the value of some items, without make a full copy or full scan of each data. It is useful when I need to handle a large RDD, and each time I need only to change a little fraction of the data, and keeps other data unchanged. Certainly I don't want to make a full copy the data to the new RDD. For example, suppose I have a RDD that contains integer data from 0 to 100. What I want is to make the first element of the RDD changed from 0 to 1, other elements untouched. I tried this, but it doesn't work: var rdd = parallelize(Range(0,100)) rdd.mapPartitions({iter= iter(0) = 1}) The reported error is : value update is not a member of Iterator[Int] Anyone knows how to make it work?
Re: Is it possible to just change the value of the items in RDD without making a full copy?
Although it feels like you are copying an RDD when you map it, it is not necessarily literally being copied. Your map function may pass through most objects unchanged. So there may not be so much overhead as you think. I don't think you can avoid a scan of the data unless you can somehow know that whole partitions do not need to be touched. If this still doesn't work you may need to reconsider your design as it may not be a great fit for the RDD model. Yes you can't somehow assign to Iterators. On Dec 2, 2014 11:23 AM, Xuelin Cao xuelin...@yahoo.com.invalid wrote: Hi, I'd like to make an operation on an RDD that *ONLY *change the value of some items, without make a full copy or full scan of each data. It is useful when I need to handle a large RDD, and each time I need only to change a little fraction of the data, and keeps other data unchanged. Certainly I don't want to make a full copy the data to the new RDD. For example, suppose I have a RDD that contains integer data from 0 to 100. What I want is to make the first element of the RDD changed from 0 to 1, other elements untouched. I tried this, but it doesn't work: var rdd = parallelize(Range(0,100)) rdd.mapPartitions({iter= iter(0) = 1}) The reported error is : value update is not a member of Iterator[Int] Anyone knows how to make it work?
Re: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down
Any ideas? Anyone got the same error? On Mon, Dec 1, 2014 at 2:37 PM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello spark users! I found lots of strange messages in driver log. Here it is: 2014-12-01 11:54:23,849 [sparkDriver-akka.actor.default-dispatcher-25] ERROR akka.remote.EndpointWriter[akka://sparkDriver/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40data1.hadoop%3A17372-5/endpointWriter] - AssociationError [akka.tcp://sparkDriver@10.54.87.173:55034] - [akka.tcp://sparkExecutor@data1.hadoop:17372]: Error [Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://sparkExecutor@data1.hadoop:17372 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] I got this message for every worker twice. First - for driverPropsFetcher and next for sparkExecutor. Looks like spark shutdown remote akka system incorrectly or there is some race condition in this process and driver sent some data to worker, but worker's actor system already in shutdown state. Except for this message everything works fine. But this is ERROR level message and I found it in my ERROR only log. Do you have any idea is it configuration issue, bug in spark or akka or something else? Thanks!
Parallelize independent tasks
Hi folks, We have written a spark job that scans multiple hdfs directories and perform transformations on them. For now, this is done with a simple for loop that starts one task at each iteration. This looks like: dirs.foreach { case (src,dest) = sc.textFile(src).process.saveAsFile(dest) } However, each iteration is independent, and we would like to optimize that by running them with spark simultaneously (or in a chained fashion), such that we don't have idle executors at the end of each iteration (some directories sometimes only require one partition) Has anyone already done such a thing? How would you suggest we could do that? Cheers, Anselme - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Does filter on an RDD scan every data item ?
Hi , I wanted some clarity into the functioning of Filter function of RDD. 1) Does filter function scan every element saved in RDD? if my RDD represents 10 Million rows, and if i want to work on only 1000 of them, is there an efficient way of filtering the subset without having to scan every element ? 2) If my RDD represents a Key / Value data set. When i filter this data set of 10 Million rows, can i specify that the search should be restricted to only partitions which contain specific keys ? Will spark run by filter operation on all partitions if the partitions are done by key, irrespective the key exists in a partition or not ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
pySpark saveAsSequenceFile append overwrite
Dear Spark community, Has the pySpark saveAsSequenceFile(folder) method the ability to append the new sequencefile into an other one or to overwrite an existing sequencefile? If the folder already exists then I get an error message... Thank You! Csaba
IP to geo information in spark streaming application
Hi I'm new to spark streaming. I'm currently writing spark streaming application to standardize events coming from Kinesis. As part of the logic, I want to use IP to geo information library or service. My questions: 1) If I would use some REST service for this task, do U think it would cause performance penalty (over using library based solution) 2) If I would use a library based solution, I will have to use some local db file. What mechanism should I use in order to transfer such db file? a broadcast variable? ?Tx, Noam.
Re: Does filter on an RDD scan every data item ?
Hi, For your first question, I think that we can use /sc.parallelize(rdd.take(1000))/ For your second question, I am not sure. But I don't think that we can restricted filter within certain partition without scan every element. Cheers Gen nsareen wrote Hi , I wanted some clarity into the functioning of Filter function of RDD. 1) Does filter function scan every element saved in RDD? if my RDD represents 10 Million rows, and if i want to work on only 1000 of them, is there an efficient way of filtering the subset without having to scan every element ? 2) If my RDD represents a Key / Value data set. When i filter this data set of 10 Million rows, can i specify that the search should be restricted to only partitions which contain specific keys ? Will spark run by filter operation on all partitions if the partitions are done by key, irrespective the key exists in a partition or not ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-filter-on-an-RDD-scan-every-data-item-tp20170p20174.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: MLlib Naive Bayes classifier confidence
Are we sure that exponentiating will give us the probabilities? I did some tests by cloning the MLLIb class and adding the required code but the calculated probabilities do not add up to 1. I tried something like : def predictProbs(testData: Vector): (BDV[Double], BDV[Double]) = { val logProbs = brzPi + brzTheta * new BDV[Double](testData.toArray) val probs = logProbs.map(x = math.exp(x)) (logProbs, probs) } This was because I need the actual probs to process downstream from this... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-Naive-Bayes-classifier-confidence-tp18456p20175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is Spark the right tool for me?
I’ve also considered to use Kafka to message between Web UI and the pipes, I think it will fit. Chaining the pipes together as a workflow and implementing, managing and monitoring these long running user tasks with locality as I need them is still causing me headache. You can look at Apache Samza for the chaining the pipes together and managing long-running tasks with locality. The long-running tasks run on YARN, can keep fault-tolerant local state (using LevelDB or RocksDB), use Kafka for message passing and state replication/persistence. On Tue, Dec 2, 2014 at 3:26 AM, Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: To be precise I want the workflow to be associated to a user, but it doesn’t need to be run as part of or depend on a session. I can’t run scheduled jobs, because a user can potentially upload hundreds of files which trigger a long running batch import / update process but he could also make a very small upload / update and immediately wants to continue to work on the (temporary) data that he just uploaded. So that same workflow duration may vary between some seconds, a minute and hours, completely depending on the project's size. So a user can log off and on again to the web site and the initial upload + conversion step may either be still running or finished. He’ll see the progress on the web site, and once the initial processing is done he can continue with the next step of the import workflow, he can interactively change some stuff on that temporary data. After he is done changing stuff, he can hit a „continue“ button which triggers again a long or short running post-processing pipe. Then the user can make a final review of that now post-processed data, and after hitting a „save“ button a final commits pipe pushes / merges the until now temporary data to some persistent store. You’re completely right about that I should simplify as much as possible. Finding the right mix seems key. I’ve also considered to use Kafka to message between Web UI and the pipes, I think it will fit. Chaining the pipes together as a workflow and implementing, managing and monitoring these long running user tasks with locality as I need them is still causing me headache. Btw, the tiling and indexing is not a problem. My propblem is mainly in parallelized conversion, polygon creation, cleaning of CAD file data (e.g. GRASS, prepair, custom tools). After all parts have been preprocessed and gathered in one place, the initial creation of the preview geo file is taking a fraction of the time (inserting all data in one transaction, taking somewhere between sub-second and 10 seconds for very large projects). It’s currently not a concern. (searching for a Kafka+Spark example now) Cheers Ben Von: andy petrella andy.petre...@gmail.com Datum: Dienstag, 2. Dezember 2014 10:00 An: Benjamin Stadin benjamin.sta...@heidelberg-mobil.com, user@spark.apache.org user@spark.apache.org Betreff: Re: Is Spark the right tool for me? The point 4 looks weird to me, I mean if you intent to have such workflow to run in a single session (maybe consider sessionless arch) Is such process for each user? If it's the case, maybe finding a way to do it for all at once would be better (more data but less scheduling). For the micro updates, considering something like a queue (kestrel? or even kafk... whatever, something that works) would be great. So you remove the load off the instances, and the updates can be done at its own pace. Also, you can reuse it to notify the WMS. Isn't there a way to do tiling directly? Also, do you need indexes, I mean do you need the full OGIS power, or just some classical operators are enough (using BBox only for instance)? The more you can simplify the better :-D. These are only my2c, it's hard to think or react appropriately without knowing the whole context. BTW, to answer your very first question: yes, it looks like Spark will help you! cheers, andy On Mon Dec 01 2014 at 4:36:44 PM Stadin, Benjamin benjamin.sta...@heidelberg-mobil.com wrote: Yes, the processing causes the most stress. But this is parallizeable by splitting the input source. My problem is that once the heavy preprocessing is done, I’m in a „micro-update“ mode so to say (user-interactive part of the whole workflow). Then the map is rendered directly from the SQLite file by the map server instance on that machine – this is actually a favorable setup for me for resource consumption and implementation costs (I just need to tell the web ui to refresh after something was written to the db, and the map server will render the updates without me changing / coding anything). So my workflow requires to break out of parallel processing for some time. Do you think for my my generalized workflow and tool chain can be like so? 1. Pre-Process many files in a parallel way. Gather all results, deploy them on one single machine. = Spark coalesce() +
Re: IP to geo information in spark streaming application
1. If you use some custom API library, there's a chance to end up with Serialization errors and all, but a normal http REST api would work fine except there could be a bit of performance lag + those api's might limit the number of requests. 2. I would go for this approach, either i will broadcast the ip data or i would cache it in a normal RDD and then i would join it with the stream data. Thanks Best Regards On Tue, Dec 2, 2014 at 8:44 PM, Noam Kfir noam.k...@perion.com wrote: Hi I'm new to spark streaming. I'm currently writing spark streaming application to standardize events coming from Kinesis. As part of the logic, I want to use IP to geo information library or service. My questions: 1) If I would use some REST service for this task, do U think it would cause performance penalty (over using library based solution) 2) If I would use a library based solution, I will have to use some local db file. What mechanism should I use in order to transfer such db file? a broadcast variable? Tx, Noam.
Re: Parallelize independent tasks
dirs.par.foreach { case (src,dest) = sc.textFile(src).process.saveAsFile(dest) } Is that sufficient for you? On Tuesday, December 2, 2014, Anselme Vignon anselme.vig...@flaminem.com wrote: Hi folks, We have written a spark job that scans multiple hdfs directories and perform transformations on them. For now, this is done with a simple for loop that starts one task at each iteration. This looks like: dirs.foreach { case (src,dest) = sc.textFile(src).process.saveAsFile(dest) } However, each iteration is independent, and we would like to optimize that by running them with spark simultaneously (or in a chained fashion), such that we don't have idle executors at the end of each iteration (some directories sometimes only require one partition) Has anyone already done such a thing? How would you suggest we could do that? Cheers, Anselme - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:; For additional commands, e-mail: user-h...@spark.apache.org javascript:;
sort algorithm using sortBy
I am trying to understand the sort algorithm that is used in RDD#sortBy. I have read that post from Matei http://apache-spark-user-list.1001560.n3.nabble.com/Complexity-Efficiency-of-SortByKey-tp14328p14332.html and that helps a little bit already. I'd like to further understand the distributed merge-sort because in my case the sort takes 10 times longer if it happens on a field whose values are not well distributed (the field's value is 0 for many of the items) compared to a sort on a field whose values are better distributed. In particular, I am wondering if the sort algorithm can be modified/injected with one that would better fit the first distribution (given that this would be known in advance). I'll be happy to look at the code myself, if someone could provide me with a pointer to the file(s) I should have a look at. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sort-algorithm-using-sortBy-tp20179.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Loading RDDs in a streaming fashion
This is a common use case and this is how Hadoop APIs for reading data work, they return an Iterator [Your Record] instead of reading every record in at once. On Dec 1, 2014 9:43 PM, Andy Twigg andy.tw...@gmail.com wrote: You may be able to construct RDDs directly from an iterator - not sure - you may have to subclass your own. On 1 December 2014 at 18:40, Keith Simmons ke...@pulse.io wrote: Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote: Could you modify your function so that it streams through the files record by record and outputs them to hdfs, then read them all in as RDDs and take the union? That would only use bounded memory. On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote: Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote: file = tranform file into a bunch of records What does this function do exactly? Does it load the file locally? Spark supports RDDs exceeding global RAM (cf the terasort example), but if your example just loads each file locally, then this may cause problems. Instead, you should load each file into an rdd with context.textFile(), flatmap that and union these rdds. also see http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote: This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file = tranform file into a bunch of records } I'm wondering if there are any APIs to somehow flush the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) = for every 10K records write records to stream and flush } Keith - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu,What are your thoughts on keeping this solution (or not), considering that Spark Streaming v1.2 will have built-in recoverability of the received data?https://issues.apache.org/jira/browse/SPARK-1647I'm concerned about the complexity of this solution with regards the added complexity and performance overhead by the writing of big amounts of data into HDFS on a small batch interval.https://docs.google.com/document/d/1vTCB5qVfyxQPlHuv8rit9-zjdttlgaSrMgfCDQlCJIM/edit?pli=1# http://apache-spark-user-list.1001560.n3.nabble.com/file/n20181/spark_streaming_v.png I think the whole solution is well designed and thought but I'm afraid if it does really fit all needs with checkpoint based technologies like Flume or Kafka, by hiding away the management of the offset from the user code. If instead of saving received data into HDFS, the ReceiverHandler would be saving some metadata (such as offset in the case of Kafka) specified by the custom receiver passed into the StreamingContext, then upon driver restart, that metadata could be used by the custom receiver to recover the point from which it should start receiving data once more.Anyone's comments will be greatly appreciated.Tnks,Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20181.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark setup on local windows machine
Thanks Sameer and Akhil for your help. I tried both your suggestions however, I still face the same issue. There was indeed space in the installation path for Scala and Sbt since I had let the defaults stay and hence the path was C:\Program Files . I reinstalled scala and sbt in c:\ as well as spark. The spark binaries I am using are built internally from source by our team and the same binary works for rest of the team. Here is what the spark-env.cmd looks like for me: set SCALA_HOME=C:\scala set SPARK_CLASSPATH=C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucleus-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar;C:\scala\lib\scala-library.jar;C:\scala\lib\scala-compiler.jar; I get the same error inspite of this. The compute-classpath.cmd yields correct results: C:\sparkbin\spark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) regards Sunita On Tue, Nov 25, 2014 at 11:47 PM, Sameer Farooqui same...@databricks.com wrote: Hi Sunita, This gitbook may also be useful for you to get Spark running in local mode on your Windows machine: http://blueplastic.gitbooks.io/how-to-light-your-spark-on-a-stick/content/ On Tue, Nov 25, 2014 at 11:09 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try following this guidelines http://docs.sigmoidanalytics.com/index.php/How_to_build_SPARK_on_Windows Thanks Best Regards On Wed, Nov 26, 2014 at 12:24 PM, Sunita Arvind sunitarv...@gmail.com wrote: Hi All, I just installed a spark on my laptop and trying to get spark-shell to work. Here is the error I see: C:\spark\binspark-shell Exception in thread main java.util.NoSuchElementException: key not found: CLAS SPATH at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:58) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper$.main(SparkSubm itDriverBootstrapper.scala:49) at org.apache.spark.deploy.SparkSubmitDriverBootstrapper.main(SparkSubmi tDriverBootstrapper.scala) The classpath seems to be right: C:\spark\bincompute-classpath.cmd ;;C:\spark\bin\..\conf;C:\spark\bin\..\lib\spark-assembly-1.1.0-hadoop2.3.0.jar; ;C:\spark\bin\..\lib\datanucleus-api-jdo-3.2.1.jar;C:\spark\bin\..\lib\datanucle us-core-3.2.2.jar;C:\spark\bin\..\lib\datanucleus-rdbms-3.2.1.jar Manually exporting the classpath to include the assembly jar doesnt help either. What could be wrong with this installation? Scala and SBT are installed, in path and are working fine. Appreciate your help. regards Sunita
Re: Negative Accumulators
Similarly, I'm having an issue with the above solution when I use the math.min() function to add to an accumulator. I'm seeing negative overflow numbers again. This code works fine without the math.min() and even if I add an arbitrarily large number like 100 // doesn't work someRDD.foreach(x={ myAccumulator+=math.min(x._1, 100) }) //works someRDD.foreach(x={ myAccumulator+=x._1+100 }) Any ideas? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scala Dependency Injection
This is a more of a Scala question than Spark question. Which Dependency Injection framework do you guys use for Scala when using Spark? Is http://scaldi.org/ recommended? Regards Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-Dependency-Injection-tp20185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help understanding - Not enough space to cache rdd
I am running in local mode. I am using google n1-highmem-16 (16 vCPU, 104 GB memory) machine. I have allocated the SPARK_DRIVER_MEMORY=95g I see Memory: 33.6 GB Used (73.7 GB Total) that the exeuctor is using. In the log out put below, I see 33.6 gb blocks are used by 2 rdds that I have cached. I should still have 40.2 gb left. However, I see messages like: 14/12/02 18:15:04 WARN storage.MemoryStore: Not enough space to cache rdd_15_9 in memory! (computed 8.1 GB so far) 14/12/02 18:15:04 INFO storage.MemoryStore: Memory use = 33.6 GB (blocks) + 40.1 GB (scratch space shared across 14 thread(s)) = 73.7 GB. Storage limit = 73.7 GB. 14/12/02 18:15:04 WARN spark.CacheManager: Persisting partition rdd_15_9 to disk instead. . . . . further down I see: 4/12/02 18:30:08 INFO storage.BlockManagerInfo: Added rdd_15_9 on disk on localhost:41889 (size: 6.9 GB) 4/12/02 18:30:08 INFO storage.BlockManagerMaster: Updated info of block rdd_15_9 14/12/02 18:30:08 ERROR executor.Executor: Exception in task 9.0 in stage 2.0 (TID 348) java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE I don't understand couple of things: 1) In this case, I am joining 2 RDDs (size 16.3 G and 17.2 GB) both rdds are create from reading from HDFS files. The size of each .part is 24.87 MB, I am reading this files into 250 partitions, so I shouldn't have any individual partition over 25MB, so how could rdd_15_9 have 8.1g? 2) Even if the data is 8.1g, spark should have enough memory to write, but I would expect Integer.MAX_VALUE 2gb limitation! However, I don't get that error message, and partial dataset is written to disk (6.9 gb). I don't understand how and why only partial dataset is written. 3) Why do get java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE after writing partial dataset. I would love to hear from anyone that can shed some light into this... None -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-understanding-Not-enough-space-to-cache-rdd-tp20186.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
Any suggestion on how can user with custom Hadoop jar solve this issue? -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Sunday, November 30, 2014 11:06 PM To: Judy Nash Cc: Denny Lee; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Thanks Judy. While this is not directly caused by a Spark issue, it is likely other users will run into this. This is an unfortunate consequence of the way that we've shaded Guava in this release, we rely on byte code shading of Hadoop itself as well. And if the user has their own Hadoop classes present it can cause issues. On Sun, Nov 30, 2014 at 10:53 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Thanks Patrick and Cheng for the suggestions. The issue was Hadoop common jar was added to a classpath. After I removed Hadoop common jar from both master and slave, I was able to bypass the error. This was caused by a local change, so no impact on the 1.2 release. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Wednesday, November 26, 2014 8:17 AM To: Judy Nash Cc: Denny Lee; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Just to double check - I looked at our own assembly jar and I confirmed that our Hadoop configuration class does use the correctly shaded version of Guava. My best guess here is that somehow a separate Hadoop library is ending up on the classpath, possible because Spark put it there somehow. tar xvzf spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar cd org/apache/hadoop/ javap -v Configuration | grep Precond Warning: Binary file Configuration contains org.apache.hadoop.conf.Configuration #497 = Utf8 org/spark-project/guava/common/base/Preconditions #498 = Class #497 // org/spark-project/guava/common/base/Preconditions #502 = Methodref #498.#501// org/spark-project/guava/common/base/Preconditions.checkArgument:(ZLj ava/lang/Object;)V 12: invokestatic #502// Method org/spark-project/guava/common/base/Preconitions.checkArgument:(ZLja va/lang/Object;)V 50: invokestatic #502// Method org/spark-project/guava/common/base/Preconitions.checkArgument:(ZLja va/lang/Object;)V On Wed, Nov 26, 2014 at 11:08 AM, Patrick Wendell pwend...@gmail.com wrote: Hi Judy, Are you somehow modifying Spark's classpath to include jars from Hadoop and Hive that you have running on the machine? The issue seems to be that you are somehow including a version of Hadoop that references the original guava package. The Hadoop that is bundled in the Spark jars should not do this. - Patrick On Wed, Nov 26, 2014 at 1:45 AM, Judy Nash judyn...@exchange.microsoft.com wrote: Looks like a config issue. I ran spark-pi job and still failing with the same guava error Command ran: .\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.examples.SparkPi --master spark://headnodehost:7077 --executor-memory 1G --num-executors 1 .\lib\spark-examples-1.2.1-SNAPSHOT-hadoop2.4.0.jar 100 Had used the same build steps on spark 1.1 and had no issue. From: Denny Lee [mailto:denny.g@gmail.com] Sent: Tuesday, November 25, 2014 5:47 PM To: Judy Nash; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava To determine if this is a Windows vs. other configuration, can you just try to call the Spark-class.cmd SparkSubmit without actually referencing the Hadoop or Thrift server classes? On Tue Nov 25 2014 at 5:42:09 PM Judy Nash judyn...@exchange.microsoft.com wrote: I traced the code and used the following to call: Spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=1 The issue ended up to be much more fundamental however. Spark doesn't work at all in configuration below. When open spark-shell, it fails with the same ClassNotFound error. Now I wonder if this is a windows-only issue or the hive/Hadoop configuration that is having this problem. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Tuesday, November 25, 2014 1:50 AM To: Judy Nash; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Oh so you're using Windows. What command are you using to start the Thrift server then? On 11/25/14 4:25 PM, Judy Nash wrote: Made progress but still blocked. After recompiling the code on cmd instead of PowerShell, now I can see all 5 classes as you mentioned. However I am still seeing the same error as before. Anything else I can check for? From:
Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava
On Tue, Dec 2, 2014 at 11:22 AM, Judy Nash judyn...@exchange.microsoft.com wrote: Any suggestion on how can user with custom Hadoop jar solve this issue? You'll need to include all the dependencies for that custom Hadoop jar to the classpath. Those will include Guava (which is not included in its original form as part of the Spark dependencies). -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Sunday, November 30, 2014 11:06 PM To: Judy Nash Cc: Denny Lee; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Thanks Judy. While this is not directly caused by a Spark issue, it is likely other users will run into this. This is an unfortunate consequence of the way that we've shaded Guava in this release, we rely on byte code shading of Hadoop itself as well. And if the user has their own Hadoop classes present it can cause issues. On Sun, Nov 30, 2014 at 10:53 PM, Judy Nash judyn...@exchange.microsoft.com wrote: Thanks Patrick and Cheng for the suggestions. The issue was Hadoop common jar was added to a classpath. After I removed Hadoop common jar from both master and slave, I was able to bypass the error. This was caused by a local change, so no impact on the 1.2 release. -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Wednesday, November 26, 2014 8:17 AM To: Judy Nash Cc: Denny Lee; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Just to double check - I looked at our own assembly jar and I confirmed that our Hadoop configuration class does use the correctly shaded version of Guava. My best guess here is that somehow a separate Hadoop library is ending up on the classpath, possible because Spark put it there somehow. tar xvzf spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar cd org/apache/hadoop/ javap -v Configuration | grep Precond Warning: Binary file Configuration contains org.apache.hadoop.conf.Configuration #497 = Utf8 org/spark-project/guava/common/base/Preconditions #498 = Class #497 // org/spark-project/guava/common/base/Preconditions #502 = Methodref #498.#501// org/spark-project/guava/common/base/Preconditions.checkArgument:(ZLj ava/lang/Object;)V 12: invokestatic #502// Method org/spark-project/guava/common/base/Preconitions.checkArgument:(ZLja va/lang/Object;)V 50: invokestatic #502// Method org/spark-project/guava/common/base/Preconitions.checkArgument:(ZLja va/lang/Object;)V On Wed, Nov 26, 2014 at 11:08 AM, Patrick Wendell pwend...@gmail.com wrote: Hi Judy, Are you somehow modifying Spark's classpath to include jars from Hadoop and Hive that you have running on the machine? The issue seems to be that you are somehow including a version of Hadoop that references the original guava package. The Hadoop that is bundled in the Spark jars should not do this. - Patrick On Wed, Nov 26, 2014 at 1:45 AM, Judy Nash judyn...@exchange.microsoft.com wrote: Looks like a config issue. I ran spark-pi job and still failing with the same guava error Command ran: .\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.examples.SparkPi --master spark://headnodehost:7077 --executor-memory 1G --num-executors 1 .\lib\spark-examples-1.2.1-SNAPSHOT-hadoop2.4.0.jar 100 Had used the same build steps on spark 1.1 and had no issue. From: Denny Lee [mailto:denny.g@gmail.com] Sent: Tuesday, November 25, 2014 5:47 PM To: Judy Nash; Cheng Lian; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava To determine if this is a Windows vs. other configuration, can you just try to call the Spark-class.cmd SparkSubmit without actually referencing the Hadoop or Thrift server classes? On Tue Nov 25 2014 at 5:42:09 PM Judy Nash judyn...@exchange.microsoft.com wrote: I traced the code and used the following to call: Spark-class.cmd org.apache.spark.deploy.SparkSubmit --class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 spark-internal --hiveconf hive.server2.thrift.port=1 The issue ended up to be much more fundamental however. Spark doesn't work at all in configuration below. When open spark-shell, it fails with the same ClassNotFound error. Now I wonder if this is a windows-only issue or the hive/Hadoop configuration that is having this problem. From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Tuesday, November 25, 2014 1:50 AM To: Judy Nash; u...@spark.incubator.apache.org Subject: Re: latest Spark 1.2 thrift server fail with NoClassDefFoundError on Guava Oh so you're using Windows. What command are you using to start the Thrift server then? On
SchemaRDD + SQL , loading projection columns
Hi, I have 16 GB of parquet files in /tmp/logs/ folder with the following schema request_id(String), module(String), payload(Array[Byte]) Most of my 16 GB data is the payload field, the request_id, and module fields take less than 200 MB. I want to load the payload only when my filter condition matches. val sqlContext = new SQLContext(sc) val files = sqlContext.parquetFile(/tmp/logs) files.registerTempTable(logs) val filteredLogs = sqlContext.sql(select request_id, payload from logs where rid = 'dd4455ee' and module = 'query' ) when i run filteredLogs.collect.foreach(println) , i see all of the 16GB data loaded. How do I load only the columns used in filters first and then load the payload for the row matching the filter criteria? Let me know if this can be done in a different way. Thanks you, Vishnu. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-SQL-loading-projection-columns-tp20189.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Unresolved attributes
I am running spark 1.1.0 DSE cassandra 4.6 when I try to run the following sql statement: val sstring = Select * from seasonality where customer_id = + customer_id + and cat_id = + seg + and period_desc = + cDate println(sstring = +sstring) val rrCheckRdd = sqlContext.sql(sstring).collect().array I get the following error: Segment Code = 205 cDate=Year_2011_Month_0_Week_0_Site reRunCheck seg = 205 sstring = Select * from seasonality where customer_id = 6 and cat_id = 205 and period_desc = Year_2011_Month_0_Week_0_Site org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Filter (((customer_id#144 = 6) (CAST(cat_id#148, DoubleType) = CAST(205, DoubleType))) (period_desc#150 = 'Year_2011_Month_0_Week_0_Site)) Subquery seasonality SparkLogicalPlan (ExistingRdd [customer_id#144,period_id#145,season_id#146,cat_lvl#147,cat_id#148,season_avg#149,period_desc#150,analyzed_date#151,sum_amt#152,total_count#153,process_id#154], MapPartitionsRDD[36] at mapPartitions at basicOperators.scala:208) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) It looks like an internal join error or possibly something else. I need to get a work around if possible or a quick patch. Any help is appreciated. Eric -- *Eric Tanner*Big Data Developer [image: JustEnough Logo] 15440 Laguna Canyon, Suite 100 Irvine, CA 92618 Cell: Tel: Skype: Web: +1 (951) 313-9274 +1 (949) 706-0400 e http://tonya.nicholls.je/ric.tanner.je www.justenough.com Confidentiality Note: The information contained in this email and document(s) attached are for the exclusive use of the addressee and may contain confidential, privileged and non-disclosable information. If the recipient of this email is not the addressee, such recipient is strictly prohibited from reading, photocopying, distribution or otherwise using this email or its contents in any way.
WordCount fails in .textFile() method
Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
WordCount fails in .textFile() method
Hi, I am trying to run JavaWordCount without using the spark-submit script. I have copied the source code for JavaWordCount and am using a JavaSparkContext with the following: SparkConf conf = new SparkConf().setAppName(JavaWordCount); conf.set(spark.io.compression.codec,org.apache.spark.io.LZ4CompressionCodec); conf.setMaster(spark://127.0.0.1:7077).setSparkHome(System.getenv(SPARK_HOME)); JavaSparkContext jsc = new JavaSparkContext(conf); jsc.addJar(myJar.jar); new JavaWordCount(jsc).doJob(); I am getting the following error in the .textFile() method: Exception in thread main java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) What can I do to solve this issue? It works fine when running from command line with spark-submit script. Thanks, Rahul
SaveAsTextFile brings down data nodes with IO Exceptions
Hi all, as the last stage of execution, I am writing out a dataset to disk. Before I do this, I force the DAG to resolve so this is the only job left in the pipeline. The dataset in question is not especially large (a few gigabytes). During this step however, HDFS will inevitable crash. I will lose connection to data-nodes and get stuck in the loop of death – failure causes job restart, eventually causing the overall job to fail. On the data node logs I see the errors below. Does anyone have any ideas as to what is going on here? Thanks! java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) DataNode{data=FSDataset{dirpath='[/opt/cloudera/hadoop/1/dfs/dn/current, /opt/cloudera/hadoop/10/dfs/dn/current, /opt/cloudera/hadoop/2/dfs/dn/current, /opt/cloudera/hadoop/3/dfs/dn/current, /opt/cloudera/hadoop/4/dfs/dn/current, /opt/cloudera/hadoop/5/dfs/dn/current, /opt/cloudera/hadoop/6/dfs/dn/current, /opt/cloudera/hadoop/7/dfs/dn/current, /opt/cloudera/hadoop/8/dfs/dn/current, /opt/cloudera/hadoop/9/dfs/dn/current]'}, localName='innovationdatanode03.cof.ds.capitalone.com:1004', datanodeUuid='e8a11fe2-300f-4e78-9211-f2ee41af6b8c', xmitsInProgress=0}:Exception transfering block BP-1458718292-10.37.248.67-1398976716371:blk_1076854538_3118445 to mirror 10.37.248.63:1004: java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Using SparkSQL to query Hbase entity takes very long time
Hi all, I am new to Spark and currently I am trying to run a SparkSQL query on HBase entity. For an entity with about 4000 rows, it will take about 12 seconds. Is it expected? Is there any way to shorten the query process? Here is the code snippet: SparkConf sparkConf = new SparkConf().setMaster(spark://serverUrl:port).setAppName(Javasparksqltest); JavaSparkContext jsc = new JavaSparkContext(sparkConf); Configuration hbase_conf = HBaseConfiguration.create(); hbase_conf.set(hbase.zookeeper.quorum, serverList); hbase_conf.set(hbase.regionserver.port, 60020); hbase_conf.set(hbase.master, master_url); hbase_conf.set(TableInputFormat.INPUT_TABLE, entityName); JavaPairRDDImmutableBytesWritable, Result hBaseRDD = jsc.newAPIHadoopRDD(hbase_conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class).cache(); // Generate the schema based on the string of schema final ListStructField keyFields = new ArrayListStructField(); for (String fieldName: this.getSchemaString().split(,)) { KeyFields.add(DataType.createStructField(fieldName, DataType.StringType, true)); } StructType schema = DataType.createStructType(keyFields); JavaRDDRow rowRDD = hBaseRDD.map( new FunctionTuple2lt;ImmutableBytesWritable, Result, Row() { public Row call(Tuple2ImmutableBytesWritable, Result re) throws Exception { return createRow(re, this.getSchemaString()); } }); JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(jsc); // Apply the schema to the RDD. JavaSchemaRDD schemaRDD = sqlContext.applySchema(rowRDD, schema); schemaRDD.registerTempTable(queryEntity); JavaSchemaRDD retRDD = sqlContext.sql(SELECT * FROM mldata WHERE name= 'Spark'); logger.info(retRDD count is + retRDD.count()); thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-SparkSQL-to-query-Hbase-entity-takes-very-long-time-tp20194.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Announcing Spark 1.1.1!
I am happy to announce the availability of Spark 1.1.1! This is a maintenance release with many bug fixes, most of which are concentrated in the core. This list includes various fixes to sort-based shuffle, memory leak, and spilling issues. Contributions from this release came from 55 developers. Visit the release notes [1] to read about the new features, or download [2] the release today. [1] http://spark.apache.org/releases/spark-release-1-1-1.html [2] http://spark.apache.org/downloads.html Please e-mail me directly for any typo's in the release notes or name listing. Thanks for everyone who contributed, and congratulations! -Andrew
Standard SQL tool access to SchemaRDD
Hello all, Is there a way to load an RDD in a small driver app and connect with a JDBC client and issue SQL queries against it? It seems the thrift server only works with pre-existing Hive tables. Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
executor logging management from python
Hi, wondering if anyone could help with this. We use ec2 cluster to run spark apps in standalone mode. The default log info goes to /$spark_folder/work/. This folder is in the 10G root fs. So it won't take long to fill up the whole fs. My goal is 1. move the logging location to /mnt, where we have 37G space. 2. make the log files iterate, meaning the new ones will replace the old ones after some threshold. Could you show how to do this? I was trying to change the spark-env.sh file -- I don't know that's the best way to do it. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-logging-management-from-python-tp20198.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Negative Accumulators
To answer my own question, I was declaring the accumulator incorrectly. The code should look like this: scala import org.apache.spark.AccumulatorParam import org.apache.spark.AccumulatorParam scala :paste // Entering paste mode (ctrl-D to finish) implicit object BigIntAccumulatorParam extends AccumulatorParam[BigInt] { def addInPlace(t1: BigInt, t2: BigInt) = t1 + t2 def zero(initialValue: BigInt) = BigInt(0) } // Exiting paste mode, now interpreting. defined module BigIntAccumulatorParam scala val accu = sc.accumulator(BigInt(0))(BigIntAccumulatorParam) accu: org.apache.spark.Accumulator[scala.math.BigInt] = 0 scala accu += 100 scala accu.value res1: scala.math.BigInt = 100 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Negative-Accumulators-tp19706p20199.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Calling spark from a java web application.
Jamal, I have not tried this, but can you not integrate Spark SQL with your Spring Java web app just like a standalone app? I have integrated a Scala web app (using Play) with Spark SQL and it works. Mohammed From: adrian [mailto:adria...@gmail.com] Sent: Friday, November 28, 2014 11:03 AM To: u...@spark.incubator.apache.org Subject: Re: Calling spark from a java web application. This may help: https://github.com/spark-jobserver/spark-jobserver On Fri, Nov 28, 2014 at 6:59 AM, Jamal [via Apache Spark User List] [hidden email]/user/SendEmail.jtp?type=nodenode=20017i=0 wrote: Hi, Any recommendation or tutorial on calling spark from java web application. Current setup: A spring java web application running on Jetty. Requirement: Need to run queries from the web app to spark sql. Please point or recommend the process. I can only find standalone app example on the spark webiste. Obliged Jamal If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007.html To start a new topic under Apache Spark User List, email [hidden email]/user/SendEmail.jtp?type=nodenode=20017i=1 To unsubscribe from Apache Spark User List, click here. NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml View this message in context: Re: Calling spark from a java web application.http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007p20017.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Standard SQL tool access to SchemaRDD
There is an experimental method that allows you to start the JDBC server with an existing HiveContext (which might have registered temporary tables). https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L42 On Tue, Dec 2, 2014 at 2:34 PM, Jim Carroll jimfcarr...@gmail.com wrote: Hello all, Is there a way to load an RDD in a small driver app and connect with a JDBC client and issue SQL queries against it? It seems the thrift server only works with pre-existing Hive tables. Thanks Jim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Standard SQL tool access to SchemaRDD
Thanks! I'll give it a try. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Standard-SQL-tool-access-to-SchemaRDD-tp20197p20202.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Unresolved attributes
A little bit about how to read this output. Resolution occurs from the bottom up and when you see a tick (') it means that a field is unresolved. So here it looks like Year_2011_Month_0_Week_0_Site is missing from from your RDD. (We are working on more obvious error messages). Michael On Tue, Dec 2, 2014 at 1:00 PM, Eric Tanner eric.tan...@justenough.com wrote: I am running spark 1.1.0 DSE cassandra 4.6 when I try to run the following sql statement: val sstring = Select * from seasonality where customer_id = + customer_id + and cat_id = + seg + and period_desc = + cDate println(sstring = +sstring) val rrCheckRdd = sqlContext.sql(sstring).collect().array I get the following error: Segment Code = 205 cDate=Year_2011_Month_0_Week_0_Site reRunCheck seg = 205 sstring = Select * from seasonality where customer_id = 6 and cat_id = 205 and period_desc = Year_2011_Month_0_Week_0_Site org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: *, tree: Project [*] Filter (((customer_id#144 = 6) (CAST(cat_id#148, DoubleType) = CAST(205, DoubleType))) (period_desc#150 = 'Year_2011_Month_0_Week_0_Site)) Subquery seasonality SparkLogicalPlan (ExistingRdd [customer_id#144,period_id#145,season_id#146,cat_lvl#147,cat_id#148,season_avg#149,period_desc#150,analyzed_date#151,sum_amt#152,total_count#153,process_id#154], MapPartitionsRDD[36] at mapPartitions at basicOperators.scala:208) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70) It looks like an internal join error or possibly something else. I need to get a work around if possible or a quick patch. Any help is appreciated. Eric -- *Eric Tanner*Big Data Developer [image: JustEnough Logo] 15440 Laguna Canyon, Suite 100 Irvine, CA 92618 Cell: Tel: Skype: Web: +1 (951) 313-9274 +1 (949) 706-0400 e http://tonya.nicholls.je/ric.tanner.je www.justenough.com Confidentiality Note: The information contained in this email and document(s) attached are for the exclusive use of the addressee and may contain confidential, privileged and non-disclosable information. If the recipient of this email is not the addressee, such recipient is strictly prohibited from reading, photocopying, distribution or otherwise using this email or its contents in any way.
Re: ALS failure with size Integer.MAX_VALUE
Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
object xxx is not a member of package com
Hello everyone, Could anybody tell me how to import and call the 3rd party java classes from inside spark? Here's my case: I have a jar file (the directory layout is com.xxx.yyy.zzz) which contains some java classes, and I need to call some of them in spark code. I used the statement import com.xxx.yyy.zzz._ on top of the impacted spark file and set the location of the jar file in the CLASSPATH environment, and use .sbt/sbt assembly to build the project. As a result, I got an error saying object xxx is not a member of package com. I thought that this could be related to the dependencies, but couldn't figure it out. Any suggestion/solution from you would be appreciated! Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/object-xxx-is-not-a-member-of-package-com-tp20205.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any ideas why a few tasks would stall
Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue
Re: Kryo NPE with Array
I finally solved this issue. The problem was that: 1. I defined a case class with a Buffer[MyType] field. 2. I instantiated the class with the field set to the value given by an implicit conversion from a Java list, which is supposedly a Buffer. 3. However, the underlying type of that field was instead scala.collection.convert.Wrappers.JListWrapper, as noted in the exception above. This type was not registered with Kryo and so that's why I got the exception. Registering the type did not solve the problem. However, an additional call to .toBuffer did solve the problem, since the Buffer class is registered through the Chill AllScalaRegistrar which is called by the Spark Kryo serializer. I thought I'd document this in case somebody else is running into a similar issue. Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Wed, Nov 26, 2014 at 7:40 PM, Simone Franzini captainfr...@gmail.com wrote: I guess I already have the answer of what I have to do here, which is to configure the kryo object with the strategy as above. Now the question becomes: how can I pass this custom kryo configuration to the spark kryo serializer / kryo registrator? I've had a look at the code but I am still fairly new to Scala and I can't see how I would do this. In the worst case, could I override the newKryo method and put my configuration there? It appears to me that method is the one where the kryo instance is created. Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini On Tue, Nov 25, 2014 at 2:38 PM, Simone Franzini captainfr...@gmail.com wrote: I am running into the following NullPointerException: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException Serialization trace: underlying (scala.collection.convert.Wrappers$JListWrapper) myArrayField (MyCaseClass) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) I have been running into similar issues when using avro classes, that I was able to resolve by registering them with a Kryo serializer that uses chill-avro. However, in this case the field is in a case class and it seems that registering the class does not help. I found this stack overflow that seems to be relevant: http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist I have this line of code translated to Scala, that supposedly solves the issue: val kryo = new Kryo() kryo.getInstantiatorStrategy().asInstanceOf[Kryo.DefaultInstantiatorStrategy].setFallbackInstantiatorStrategy(new StdInstantiatorStrategy()) However, I am not sure where this line should be placed to take effect. I already have the following, should it go somewhere in here? class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(...) } } Simone Franzini, PhD http://www.linkedin.com/in/simonefranzini
Reading from Kerberos Secured HDFS in Spark?
Hi everyone, I¹ve been trying to set up Spark so that it can read data from HDFS, when the HDFS cluster is integrated with Kerberos authentication. I¹ve been using the Spark shell to attempt to read from HDFS, in local mode. I¹ve set all of the appropriate properties in core-site.xml and hdfs-site.xml as I can access and write data using the Hadoop command line utilities. I¹ve also set HADOOP_CONF_DIR to point to the directory where core-site.xml and hdfs-site.xml live. I used UserGroupInformation.setConfiguration(conf) and UserGroupInformation.loginUserFromKeytab() to set up the token, and then SparkContext.newAPIHadoopFile( conf) (instead of SparkContext.textFile() which I would think not pass the appropriate configurations with the Kerberos credentials). When I do that, I get the stack trace (sorry about the color): java.io.IOException: Can't get Master Kerberos principal for use as renewer at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInte rnal(TokenCache.java:116) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInte rnal(TokenCache.java:100) at org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(Tok enCache.java:80) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFo rmat.java:242) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFor mat.java:385) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:94) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) I was wondering if anyone has had any experience setting up Spark to read from Kerberized HDFS. What configurations needed to be set in spark-env.sh? What am I missing? Also, will I have an issue if I try to access HDFS in distributed mode, using a standalone setup? Thanks, -Matt Cheah smime.p7s Description: S/MIME cryptographic signature
Re: executor logging management from python
cat spark-env.sh -- #!/usr/bin/env bash export SPARK_WORKER_OPTS=-Dspark.executor.logs.rolling.strategy=time -Dspark.executor.logs.rolling.time.interval=daily -Dspark.executor.logs.rolling.maxRetainedFiles=3 export SPARK_LOCAL_DIRS=/mnt/spark export SPARK_WORKER_DIR=/mnt/spark -- But the spark log still writes to /$Spark_home/work folder. why spark doesn't take the changes? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/executor-logging-management-from-python-tp20198p20210.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Any ideas why a few tasks would stall
1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in TachyonSize on Disk2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2Memory Deserialized 1x Replicated1100%11.8 MB0.0 B0.0 B14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14Memory Deserialized 1x Replicated1100%122.7 MB0.0 B0.0 B7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7Memory Deserialized 1x Replicated120100%151.1 MB0.0 B0.0 B1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1Memory Deserialized 1x Replicated1100%65.6 MB0.0 B0.0 B10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10Memory Deserialized 1x Replicated24100%160.6 MB0.0 B0.0 B27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27Memory Deserialized 1x Replicated11697% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Calling spark from a java web application.
We have a web application which talks to spark server. This is how we have done the integration. 1) In the tomcat's classpath, add the spark distribution jar for spark code to be available at runtime ( for you it would be Jetty). 2) In the Web application project, add the spark distribution jar in the classpath ( Could be Java / Web project). 3) Setup the FAIR scheduling mode, which would help send parallel requests from web application to the spark cluster. 4) In our application startup, initialize the connection to the spark cluster. This is composed of creating the JavaSparkContext and making it available throughout the web application, in case this needs to be the only Driver Program required by the web application. 5) Using the JavaSpark Context, Create RDD's and make them available globally to the web application code. 6) invoke transformation / actions as required. Hopefully this info is of some use.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Calling-spark-from-a-java-web-application-tp20007p20213.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem creating EC2 cluster using spark-ec2
Interesting. Do you have any problems when launching in us-east-1? What is the full output of spark-ec2 when launching a cluster? (Post it to a gist if it’s too big for email.) On Mon, Dec 1, 2014 at 10:34 AM, Dave Challis dave.chal...@aistemos.com wrote: I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Problem creating EC2 cluster using spark-ec2
+Andrew Actually I think this is because we haven't uploaded the Spark binaries to cloudfront / pushed the change to mesos/spark-ec2. Andrew, can you take care of this ? On Tue, Dec 2, 2014 at 5:11 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Interesting. Do you have any problems when launching in us-east-1? What is the full output of spark-ec2 when launching a cluster? (Post it to a gist if it’s too big for email.) On Mon, Dec 1, 2014 at 10:34 AM, Dave Challis dave.chal...@aistemos.com wrote: I've been trying to create a Spark cluster on EC2 using the documentation at https://spark.apache.org/docs/latest/ec2-scripts.html (with Spark 1.1.1). Running the script successfully creates some EC2 instances, HDFS etc., but appears to fail to copy the actual files needed to run Spark across. I ran the following commands: $ cd ~/src/spark-1.1.1/ec2 $ ./spark-ec2 --key-pair=* --identity-file=* --slaves=1 --region=eu-west-1 --zone=eu-west-1a --instance-type=m3.medium --no-ganglia launch foocluster I see the following in the script's output: (instance and HDFS set up happens here) ... Persistent HDFS installed, won't start by default... ~/spark-ec2 ~/spark-ec2 Setting up spark-standalone RSYNC'ing /root/spark/conf to slaves... *.eu-west-1.compute.amazonaws.com RSYNC'ing /root/spark-ec2 to slaves... *.eu-west-1.compute.amazonaws.com ./spark-standalone/setup.sh: line 22: /root/spark/sbin/stop-all.sh: No such file or directory ./spark-standalone/setup.sh: line 27: /root/spark/sbin/start-master.sh: No such file or directory ./spark-standalone/setup.sh: line 33: /root/spark/sbin/start-slaves.sh: No such file or directory Setting up tachyon RSYNC'ing /root/tachyon to slaves... ... (Tachyon setup happens here without any problem) I can ssh to the master (using the ./spark-ec2 login), and looking in /root/, it contains: $ ls /root ephemeral-hdfs hadoop-native mapreduce persistent-hdfs scala shark spark spark-ec2 tachyon If I look in /root/spark (where the sbin directory should be found), it only contains a single 'conf' directory: $ ls /root/spark conf Any idea why spark-ec2 might have failed to copy these files across? Thanks, Dave - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Low Level Kafka Consumer for Spark
Hi Rod, The purpose of introducing WAL mechanism in Spark Streaming as a general solution is to make all the receivers be benefit from this mechanism. Though as you said, external sources like Kafka have their own checkpoint mechanism, instead of storing data in WAL, we can only store metadata to WAL, and recover from the last committed offsets. But this requires sophisticated design of Kafka receiver with low-level API involved, also we need to take care of rebalance and fault tolerance things by ourselves. So right now instead of implementing a whole new receiver, we choose to implement a simple one, though the performance is not so good, it's much easier to understand and maintain. The design purpose and implementation of reliable Kafka receiver can be found in (https://issues.apache.org/jira/browse/SPARK-4062). And in future, to improve the reliable Kafka receiver like what you mentioned is on our scheduler. Thanks Jerry -Original Message- From: RodrigoB [mailto:rodrigo.boav...@aspect.com] Sent: Wednesday, December 3, 2014 5:44 AM To: u...@spark.incubator.apache.org Subject: Re: Low Level Kafka Consumer for Spark Dibyendu, Just to make sure I will not be misunderstood - My concerns are referring to the Spark upcoming solution and not yours. I would to gather the perspective of someone which implemented recovery with Kafka a different way. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p20196.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: IP to geo information in spark streaming application
1) I think using library based solution is a better idea, we used that, and it works.2) We used broadcast variable, and it works qinwei From: Noam KfirDate: 2014-12-02 23:14To: user@spark.apache.orgSubject: IP to geo information in spark streaming application Hi I'm new to spark streaming. I'm currently writing spark streaming application to standardize events coming from Kinesis. As part of the logic, I want to use IP to geo information library or service. My questions: 1) If I would use some REST service for this task, do U think it would cause performance penalty (over using library based solution) 2) If I would use a library based solution, I will have to use some local db file. What mechanism should I use in order to transfer such db file? a broadcast variable? Tx, Noam.
Re: Spark SQL table Join, one task is taking long
Bump up. Michael Armbrust, anybody from Spark SQL team? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20218.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Viewing web UI after fact
How do you solved this problem? I run the standalone application, but there is no APPLICATION_COMPLETE file too. On Sat, Nov 8, 2014 at 2:11 PM, Arun Ahuja aahuj...@gmail.com wrote: We are running our applications through YARN and are only somtimes seeing them into the History Server. Most do not seem to have the APPLICATION_COMPLETE file. Specifically any job that ends because of yarn application -kill does not show up. For other ones what would be a reason for them not to appear in the Spark UI? Is there any update on this? Thanks, Arun On Mon, Sep 15, 2014 at 4:10 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi Andrew, sorry for late response. Thank you very much for solving my problem. There was no APPLICATION_COMPLETE file in log directory due to not calling sc.stop() at the end of program. With stopping spark context everything works correctly, so thank you again. Best regards, Grzegorz On Fri, Sep 5, 2014 at 8:06 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Can you verify that there are APPLICATION_COMPLETE files in the event log directories? E.g. Does file:/tmp/spark-events/app-name-1234567890/APPLICATION_COMPLETE exist? If not, it could be that your application didn't call sc.stop(), so the ApplicationEnd event is not actually logged. The HistoryServer looks for this special file to identify applications to display. You could also try manually adding the APPLICATION_COMPLETE file to this directory; the HistoryServer should pick this up and display the application, though the information displayed will be incomplete because the log did not capture all the events (sc.stop() does a final close() on the file written). Andrew 2014-09-05 1:50 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com : Hi Andrew, thank you very much for your answer. Unfortunately it still doesn't work. I'm using Spark 1.0.0, and I start history server running sbin/start-history-server.sh dir, although I also set SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory in conf/spark-env.sh. I tried also other dir than /tmp/spark-events which have all possible permissions enabled. Also adding file: (and file://) didn't help - history server still shows: History Server Event Log Location: file:/tmp/spark-events/ No Completed Applications Found. Best regards, Grzegorz On Thu, Sep 4, 2014 at 8:20 PM, Andrew Or and...@databricks.com wrote: Hi Grzegorz, Sorry for the late response. Unfortunately, if the Master UI doesn't know about your applications (they are completed with respect to a different Master), then it can't regenerate the UIs even if the logs exist. You will have to use the history server for that. How did you start the history server? If you are using Spark =1.0, you can pass the directory as an argument to the sbin/start-history-server.sh script. Otherwise, you may need to set the following in your conf/spark-env.sh to specify the log directory: export SPARK_HISTORY_OPTS=-Dspark.history.fs.logDirectory=/tmp/spark-events It could also be a permissions thing. Make sure your logs in /tmp/spark-events are accessible by the JVM that runs the history server. Also, there's a chance that /tmp/spark-events is interpreted as an HDFS path depending on which Spark version you're running. To resolve any ambiguity, you may set the log path to file:/tmp/spark-events instead. But first verify whether they actually exist. Let me know if you get it working, -Andrew 2014-08-19 8:23 GMT-07:00 Grzegorz Białek grzegorz.bia...@codilime.com: Hi, Is there any way view history of applications statistics in master ui after restarting master server? I have all logs ing /tmp/spark-events/ but when I start history server in this directory it says No Completed Applications Found. Maybe I could copy this logs to dir used by master server but I couldn't find any. Or maybe I'm doing something wrong launching history server. Do you have any idea how to solve it? Thanks, Grzegorz On Thu, Aug 14, 2014 at 10:53 AM, Grzegorz Białek grzegorz.bia...@codilime.com wrote: Hi, Thank you both for your answers. Browsing using Master UI works fine. Unfortunately History Server shows No Completed Applications Found even if logs exists under given directory, but using Master UI is enough for me. Best regards, Grzegorz On Wed, Aug 13, 2014 at 8:09 PM, Andrew Or and...@databricks.com wrote: The Spark UI isn't available through the same address; otherwise new applications won't be able to bind to it. Once the old application finishes, the standalone Master renders the after-the-fact application UI and exposes it under a different URL. To see this, go to the Master UI (master-url:8080) and click on your application in the Completed Applications table. 2014-08-13 10:56 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com: Take a look at http://spark.apache.org/docs/latest/monitoring.html -- you need to
Monitoring Spark
hello, im running spark on a cluster and i want to monitor how many nodes/ cores are active in different (specific) points of the program. is there any way to do this? thanks, Isca
Monitoring Spark
hello, im running spark on a cluster and i want to monitor how many nodes/ cores are active in different (specific) points of the program. is there any way to do this? thanks, Isca
Re: Monitoring Spark
Hi Isca, I think SPM can do that for you: http://blog.sematext.com/2014/10/07/apache-spark-monitoring/ Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Tue, Dec 2, 2014 at 11:57 PM, Isca Harmatz pop1...@gmail.com wrote: hello, im running spark on a cluster and i want to monitor how many nodes/ cores are active in different (specific) points of the program. is there any way to do this? thanks, Isca