SparkR Installation
Hi All, I wanted to try SparkR. Do we need preinstalled R on all the nodes of the cluster before installing SparkR package ? Please guide me how to proceed with this. As of now, I work with R only on single node. Please suggest Thanks Stuti Awasthi ::DISCLAIMER:: The contents of this e-mail and any attachment(s) are confidential and intended for the named recipient(s) only. E-mail transmission is not guaranteed to be secure or error-free as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or may contain viruses in transmission. The e mail and its contents (with or without referred errors) shall therefore not attach any liability on the originator or HCL or its affiliates. Views or opinions, if any, presented in this email are solely those of the author and may not necessarily reflect the views or opinions of HCL or its affiliates. Any form of reproduction, dissemination, copying, disclosure, modification, distribution and / or publication of this message without the prior written consent of authorized representative of HCL is strictly prohibited. If you have received this email in error please delete it and notify the sender immediately. Before opening any email and/or attachments, please check them for viruses and other defects.
Re: Enormous EC2 price jump makes r3.large patch more important
Ah, right. So only the launch script has changed. Everything else is still essentially binary compatible? Well, that makes it too easy! Thanks! On Wed, Jun 18, 2014 at 2:35 PM, Patrick Wendell pwend...@gmail.com wrote: Actually you'll just want to clone the 1.0 branch then use the spark-ec2 script in there to launch your cluster. The --spark-git-repo flag is if you want to launch with a different version of Spark on the cluster. In your case you just need a different version of the launch script itself, which will be present in the 1.0 branch of Spark. - Patrick On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: I am about to spin up some new clusters, so I may give that a go... any special instructions for making them work? I assume I use the --spark-git-repo= option on the spark-ec2 command. Is it as easy as concatenating your string as the value? On cluster management GUIs... I've been looking around at Amabari, Datastax, Cloudera, OpsCenter etc. Not totally convinced by any of them yet. Anyone using a good one I should know about? I'm really beginning to lean in the direction of Cassandra as the distributed data store... On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell pwend...@gmail.com wrote: By the way, in case it's not clear, I mean our maintenance branches: https://github.com/apache/spark/tree/branch-1.0 On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell pwend...@gmail.com wrote: Hey Jeremy, This is patched in the 1.0 and 0.9 branches of Spark. We're likely to make a 1.0.1 release soon (this patch being one of the main reasons), but if you are itching for this sooner, you can just checkout the head of branch-1.0 and you will be able to use r3.XXX instances. - Patrick On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee unorthodox.engine...@gmail.com wrote: Some people (me included) might have wondered why all our m1.large spot instances (in us-west-1) shut down a few hours ago... Simple reason: The EC2 spot price for Spark's default m1.large instances just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times. Probably something to do with world cup. So far this is just us-west-1, but prices have a tendency to equalize across centers as the days pass. Time to make backups and plans. m3 spot prices are still down at $0.02 (and being new, will be bypassed by older systems), so it would be REAAALLYY nice if there had been some progress on that issue. Let me know if I can help with testing and whatnot. -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers -- Jeremy Lee BCompSci(Hons) The Unorthodox Engineers
Re: question about setting SPARK_CLASSPATH IN spark_env.sh
Thanks, I hope this problem will go away once I upgrade to spark 1.0 where we can send the clusterwide classpaths using spark-submit command -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7822.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Memory footprint of Calliope: Spark - Cassandra writes
Gerard, We haven't done a test on Calliope vs a driver. The thing is Calliope builds on C* thrift (and latest build on DS driver) and the performance in terms of simple write will be similar to any existing driver. But then that is not the use case for Calliope. It is built to be used from Spark and to harness the distributed nature of Spark. With a regular driver you would have to take care of multithreading, splitting the data, etc. While with spark and Calliope this comes free. Regards, Rohit On Tue, Jun 17, 2014 at 9:24 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi Rohit, Thanks a lot for looking at this. The intention of calculating the data upfront it to only benchmark the time it takes store in records/sec eliminating the generation factor from it (which will be different on the real scenario, reading from HDFS) I used a profiler today and indeed it's not the storage part, but the generation that's bloating the memory. Objects in memory take surprisingly more space that one would expect based on the data they hold. In my case it was 2.1x the size of the original data. Now that we are talking about this, do you have some figures of how Calliope compares -performance wise- to a classic Cassandra driver (DataStax / Astyanax) ? that would be awesome. Thanks again! -kr, Gerard. On Tue, Jun 17, 2014 at 4:27 PM, tj opensource opensou...@tuplejump.com wrote: Dear Gerard, I just tried the code you posted in the gist ( https://gist.github.com/maasg/68de6016bffe5e71b78c) and it does give a OOM. It is cause of the data being generated locally and then paralellized - -- val entries = for (i - 1 to total) yield { Array(sdevy$i, aggr, 1000, sum, (i to i+10).mkString(,)) } val rdd = sc.parallelize(entries,8) -- This will generate all the data on the local system and then try to partition it. Instead, we should paralellize the keys (i - 1 to total) and generate data in the map tasks. This is *closer* to what you will get if you distribute out a file on a DFS like HDFS/SnackFS. I have made the change in the script here ( https://gist.github.com/milliondreams/aac52e08953949057e7d) -- val rdd = sc.parallelize(1 to total, 8).map(i = Array(sdevy$i, aggr, 1000, sum, (i to i+10).mkString(,))) -- I was able to insert 50M records using just over 350M RAM. Attaching the log and screenshot. Let me know if you still face this issue... we can do a screen share and resolve thee issue there. And thanks for using Calliope. I hope it serves your needs. Cheers, Rohit On Mon, Jun 16, 2014 at 9:57 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, I've been doing some testing with Calliope as a way to do batch load from Spark into Cassandra. My initial results are promising on the performance area, but worrisome on the memory footprint side. I'm generating N records of about 50 bytes each and using the UPDATE mutator to insert them into C*. I get OOM if my memory is below 1GB per million of records, or about 50Mb of raw data (without counting any RDD/structural overhead). (See code [1]) (so, to avoid confusions: e.g.: I need 4GB RAM to save 4M of 50Byte records to Cassandra) That's an order of magnitude more than the RAW data. I understood that Calliope builds on top of the Hadoop support of Cassandra, which builds on top of SSTables and sstableloader. I would like to know what's the memory usage factor of Calliope and what parameters could I use to control/tune that. Any experience/advice on that? -kr, Gerard. [1] https://gist.github.com/maasg/68de6016bffe5e71b78c
Re: Wildcard support in input path
Hi Andrew, Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says file not found. I'll try again. Jianshi On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote: In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Wildcard support in input path
Hi all, Thanks for the reply. I'm using parquetFile as input, is that a problem? In hadoop fs -ls, the path (hdfs://domain/user/jianshuang/data/parquet/table/month=2014*) will get list all the files. I'll test it again. Jianshi On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Andrew, Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says file not found. I'll try again. Jianshi On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote: In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Contribution to Spark MLLib
Hello Xiangrui, Thanks for sharing the roadmap. I really helped. Regards, Jayati -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7826.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark Streaming Example with CDH5
There is nothing special about CDH5 Spark in this regard. CDH 5.0.x has Spark 0.9.0, and the imminent next release will have 1.0.0 + upstream patches. You're simply accessing a class that was not present in 0.9.0, but is present after that: https://github.com/apache/spark/commits/master/core/src/main/scala/org/apache/spark/SecurityManager.scala On Wed, Jun 18, 2014 at 3:14 AM, manas Kar manas@exactearth.com wrote: Hi Spark Gurus, I am trying to compile a spark streaming example with CDH5 and having problem compiling it. Has anyone created an example spark streaming using CDH5(preferably Spark 0.9.1) and would be kind enough to share the build.sbt(.scala) file?(or point to their example on github). I know there is a streaming example here https://github.com/apache/spark/tree/master/examples but I am looking for something that runs with CDH5. My build.scala files looks like given below. object Dependency { // Versions object V { val Akka = 2.3.0 val scala = 2.10.4 val cloudera = 0.9.0-cdh5.0.0 } val sparkCore = org.apache.spark %% spark-core% V.cloudera val sparkStreaming = org.apache.spark %% spark-streaming % V.cloudera resolvers ++= Seq( cloudera repo at https://repository.cloudera.com/artifactory/cloudera-repos/;, haddop repo at https://repository.cloudera.com/content/repositories/releases/;) I have also attached the complete build.scala file for sake of completeness. sbt dist gives the following error: object SecurityManager is not a member of package org.apache.spark [error] import org.apache.spark.{SparkConf, SecurityManager} build.scala http://apache-spark-user-list.1001560.n3.nabble.com/file/n7796/build.scala Appreciate the great work the spark community is doing. It is by far the best thing I have worked on. ..Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Example-with-CDH5-tp7796.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: join operation is taking too much time
Hi, Thanks Andrew and Daniel for the response. Setting spark.shuffle.spill to false didnt make any difference. 5 days completed in 6 min and 10 days was stuck after around 1hr. Daniel,in my current use case I cant read all the files to a single RDD.But I have another use case where I did it in that way,ie I read all the files to a single RDD and joined with with the RDD of 9 million rows and it worked fine and took only 3 minutes. Thanks Regards, Meethu M On Wednesday, 18 June 2014 12:11 AM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: I've been wondering about this. Is there a difference in performance between these two? valrdd1 =sc.textFile(files.mkString(,))valrdd2 =sc.union(files.map(sc.textFile(_))) I don't know about your use-case, Meethu, but it may be worth trying to see if reading all the files into one RDD (like rdd1) would perform better in the join. (If this is possible in your situation.) On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or and...@databricks.com wrote: How long does it get stuck for? This is a common sign for the OS thrashing due to out of memory exceptions. If you keep it running longer, does it throw an error? Depending on how large your other RDD is (and your join operation), memory pressure may or may not be the problem at all. It could be that spilling your shuffles to disk is slowing you down (but probably shouldn't hang your application). For the 5 RDDs case, what happens if you set spark.shuffle.spill to false? 2014-06-17 5:59 GMT-07:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, I want to do a recursive leftOuterJoin between an RDD (created from file) with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 30 diff files in each iteration of a loop) varying from 1 to 6 million rows. When I run it for 5 RDDs,its running successfully in 5 minutes.But when I increase it to 10 or 30 RDDs its gradually slowing down and finally getting stuck without showing any warning or error. I am running in standalone mode with 2 workers of 4GB each and a total of 16 cores . Any of you facing similar problems with JOIN or is it a problem with my configuration. Thanks Regards, Meethu M
Re: Unit test failure: Address already in use
Hi, Could your problem come from the fact that you run your tests in parallel ? If you are spark in local mode, you cannot have concurrent spark instances running. this means that your tests instantiating sparkContext cannot be run in parallel. The easiest fix is to tell sbt to not run parallel tests. This can be done by adding the following line in your build.sbt: parallelExecution in Test := false Cheers, Anselme 2014-06-17 23:01 GMT+02:00 SK skrishna...@gmail.com: Hi, I have 3 unit tests (independent of each other) in the /src/test/scala folder. When I run each of them individually using: sbt test-only test, all the 3 pass the test. But when I run them all using sbt test, then they fail with the warning below. I am wondering if the binding exception results in failure to run the job, thereby causing the failure. If so, what can I do to address this binding exception? I am running these tests locally on a standalone machine (i.e. SparkContext(local, test)). 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:174) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: get schema from SchemaRDD
We just merged a feature into master that lets you print the schema or view it as a string (printSchema() and schemaTreeString on SchemaRDD). There is also this JIRA targeting 1.1 for presenting a nice programatic API for this information: https://issues.apache.org/jira/browse/SPARK-2179 On Wed, Jun 18, 2014 at 10:36 AM, Kevin Jung itsjb.j...@samsung.com wrote: Can I get schema information from SchemaRDD? For example, *case class Person(name:String, Age:Int, Gender:String, Birth:String) val peopleRDD = sc.textFile(/sample/sample.csv).map(_.split(,)).map(p = Person(p(0).toString, p(1).toInt, p(2).toString, p(3).toString)) peopleRDD.saveAsParquetFile(people.parquet)* (few days later...) *val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val loadedPeopleRDD = sqlContext.parquetFile(people.parquet) loadedPeopleRDD.registerAsTable(peopleTable)* Someone who doesn't know Person class can't know what columns and types this table have. Maybe they want to get schema information from loadedPeopleRDD. How can I do this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/get-schema-from-SchemaRDD-tp7830.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: rdd.cache() is not faster?
You cannot assume that caching would always reduce the execution time, especially if the data-set is large. It appears that if too much memory is used for caching, then less memory is left for the actual computation itself. There has to be a balance between the two. Page 33 of this thesis from KTH talks about this: http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf Best - Gaurav Jain Master's Student, D-INFK ETH Zurich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Cannot print a derived DStream after reduceByKey
I guess this is a basic question about the usage of reduce. Please shed some lights, thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7834p7836.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Cannot print a derived DStream after reduceByKey
In the test application, I create a DStream by connect with a socket. Then I want to count the RDDs in the DStream which matches with another reference RDD. Below is the Java code for my application. == public class TestSparkStreaming { public static void main(String[] args) { // Function to make a pair of String class StringToPair implements PairFunctionString, String, String { String value_; StringToPair(String value) { value_ = value; } @Override public Tuple2String, String call(String arg0) throws Exception { return new Tuple2String, String(arg0, value_); } } JavaStreamingContext jssc = new JavaStreamingContext(local, TestSparkStreaming, new Duration(1000)); JavaReceiverInputDStreamString networkevents = jssc.socketTextStream(localhost, ); // Pair input line with world JavaPairDStreamString, String streamEvents = networkevents.mapToPair(new StringToPair(world)); // Construct hello - spark pair for input line to join with JavaSparkContext sc = new JavaSparkContext(new SparkConf()); ListString list = Arrays.asList(hello); JavaRDDString reference = sc.parallelize(list); final JavaPairRDDString, String referenceData = reference.mapToPair(new StringToPair(spark)); class MatchInputLine implements PairFunctionTuple2lt;String, String, String, Long { @Override public Tuple2String, Long call( Tuple2String, String t) throws Exception { final String inputKey = t._1; final String inputValue = t._2; final ListString ret = referenceData.lookup(inputKey); return new Tuple2String, Long(inputKey, new Long((ret != null) ? ret.size() : 0)); } } // Construct an output DStream if matched JavaPairDStreamString, Long joinedStream = streamEvents.mapToPair(new MatchInputLine()); // Count the output class Count implements Function2Long, Long, Long { @Override public Long call(Long v1, Long v2) throws Exception { return v1 + v2; } } JavaPairDStreamString, Long aggregatedJoinedStream = joinedStream.reduceByKey(new Count()); // Print the output aggregatedJoinedStream.count().print(); jssc.start(); jssc.awaitTermination(); } } == I'm testing on Windows in local mode (1.0.0). After I start the socket server (the nc program mentioned in Spark's document) and submit the packaged jar into Spark, I expect to see the output when I type hello in. However, I didn't see any output. I saw below message in the console where I submit the jar. == 14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms 14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with curMem=0, maxMem=1235327385 14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as bytesto memory (size 12.0 B, free 1178.1 MB) 14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB) 14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block input-0-1403086668400 14/06/18 18:17:48 INFO SendingConnection: Initiating connection to [PEK-WKST68449/10.101.3.75:60769] 14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from [PEK-WKST68449/10.101.3.75] 14/06/18 18:17:48 INFO SendingConnection: Connected to [PEK-WKST68449/10.101.3.75:60769], 1 messages pending 14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already existson this machine; not re-adding it 14/06/18 18:17:48 INFO SendingConnection: Initiating connection to [/127.0.0.1:60789] 14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from [127.0.0.1/127.0.0.1] 14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1 messages pending 14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400 14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks 14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms == I see one Waiting Batches in Spark's monitoring UI. I'm
BSP realization on Spark
Hi, We are trying to implement a BSP model in Spark with the help of GraphX. One thing I encountered is a Pregel operator in Graph class. But what I fail to understand is how the Master and Worker needs to be assigned (BSP), and how barrier synchronization would happen. The pregel operator provides a way to define a vertex program, but nothing is mentioned about the barrier synchronization. Any help in this regard is truly appreciated. Many Thanks, Ghousia.
Re: Java IO Stream Corrupted - Invalid Type AC?
Patrick, My team is using shuffle consolidation but not speculation. We are also using persist(DISK_ONLY) for caching. Here are some config changes that are in our work-in-progress. We've been trying for 2 weeks to get our production flow (maybe around 50-70 stages, a few forks and joins with up to 20 branches in the forks) to run end to end without any success, running into other problems besides this one as well. For example, we have run into situations where saving to HDFS just hangs on a couple of tasks, which are printing out nothing in their logs and not taking any CPU. For testing, our input data is 10 GB across 320 input splits and generates maybe around 200-300 GB of intermediate and final data. conf.set(spark.executor.memory, 14g) // TODO make this configurable // shuffle configs conf.set(spark.default.parallelism, 320) // TODO make this configurable conf.set(spark.shuffle.consolidateFiles,true) conf.set(spark.shuffle.file.buffer.kb, 200) conf.set(spark.reducer.maxMbInFlight, 96) conf.set(spark.rdd.compress,true // we ran into a problem with the default timeout of 60 seconds // this is also being set in the master's spark-env.sh. Not sure if it needs to be in both places conf.set(spark.worker.timeout,180) // akka settings conf.set(spark.akka.threads, 300) conf.set(spark.akka.timeout, 180) conf.set(spark.akka.frameSize, 100) conf.set(spark.akka.batchSize, 30) conf.set(spark.akka.askTimeout, 30) // block manager conf.set(spark.storage.blockManagerTimeoutIntervalMs, 18) conf.set(spark.blockManagerHeartBeatMs, 8) -Suren On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell pwend...@gmail.com wrote: Out of curiosity - are you guys using speculation, shuffle consolidation, or any other non-default option? If so that would help narrow down what's causing this corruption. On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Matt/Ryan, Did you make any headway on this? My team is running into this also. Doesn't happen on smaller datasets. Our input set is about 10 GB but we generate 100s of GBs in the flow itself. -Suren On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton compton.r...@gmail.com wrote: Just ran into this today myself. I'm on branch-1.0 using a CDH3 cluster (no modifications to Spark or its dependencies). The error appeared trying to run GraphX's .connectedComponents() on a ~200GB edge list (GraphX worked beautifully on smaller data). Here's the stacktrace (it's quite similar to yours https://imgur.com/7iBA4nJ ). 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed 4 times; aborting job 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at VertexRDD.scala:100 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 5.599:39 failed 4 times, most recent failure: Exception failure in TID 29735 on host node18: java.io.StreamCorruptedException: invalid type code: AC java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192) org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75) org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51)
Re: Contribution to Spark MLLib
Hello everybody, Xiangrui, thanks for the link to roadmap. I saw it is planned to implement LDA in the MLlib 1.1. What do you think about PLSA? I understand that LDA is more popular now, but recent research shows that modifications of PLSA sometimes performs better[1]. Furthermore, the most recent paper by same authors shows that there is a clear way to extend PLSA to LDA and beyond[2]. We can implement PLSA with this modifications in MLlib. Is it interesting? Actually we already have implementation of Robust PLSA over Spark. So the task is to integrate it into MLlib. 1. A. Potapenko, K. Vorontsov. 2013. Robust PLSA performs better than LDA. In Proceedings of ECIR'13. 2. Vorontsov, Potapenko. Tutorial on Probabilistic Topic Modeling: Additive Regularization for Stochastic Matrix Factorization. http://www.machinelearning.ru/wiki/images/1/1f/Voron14aist.pdf Best regards, Denis. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7844.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: rdd.cache() is not faster?
Hi Gaurav, thanks for your pointer. The observation in the link is (at least qualitatively) similar to mine. Now the question is, if I do have big data (40GB, cached size is 60GB) and even big memory (192 GB), I cannot benefit from RDD cache, and should persist on disk and leverage filesystem cache? I will try more workers so that each JVM has a smaller heap. Best regards, Wei - Wei Tan, PhD Research Staff Member IBM T. J. Watson Research Center http://researcher.ibm.com/person/us-wtan From: Gaurav Jain ja...@student.ethz.ch To: u...@spark.incubator.apache.org, Date: 06/18/2014 06:30 AM Subject:Re: rdd.cache() is not faster? You cannot assume that caching would always reduce the execution time, especially if the data-set is large. It appears that if too much memory is used for caching, then less memory is left for the actual computation itself. There has to be a balance between the two. Page 33 of this thesis from KTH talks about this: http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf Best - Gaurav Jain Master's Student, D-INFK ETH Zurich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: rdd.cache() is not faster?
if I do have big data (40GB, cached size is 60GB) and even big memory (192 GB), I cannot benefit from RDD cache, and should persist on disk and leverage filesystem cache? The answer to the question of whether to persist (spill-over) data on disk is not always immediately clear, because generally the functions to compute RDD partitions are not as expensive as retrieving the saved partition from disk. That's why, the default STORAGE_LEVEL never stores RDD partitions on disk, and instead computes them on the fly. Also, you can try using Kryo serialization (if not using it already) to reduce memory usage. Playing around with different Storage levels (MEMORY_ONLY_SER, for example) might also help. Best Gaurav Jain Master's Student, D-INFK ETH Zurich Email: jaing at student dot ethz dot ch - Gaurav Jain Master's Student, D-INFK ETH Zurich -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7846.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Wildcard support in input path
Is that month= syntax something special, or do your files actually have that string as part of their name? On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi all, Thanks for the reply. I'm using parquetFile as input, is that a problem? In hadoop fs -ls, the path (hdfs://domain/user/ jianshuang/data/parquet/table/month=2014*) will get list all the files. I'll test it again. Jianshi On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Andrew, Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says file not found. I'll try again. Jianshi On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote: In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Wildcard support in input path
Hi Nicholas, month= is for Hive to auto discover the partitions. It's part of the url of my files. Jianshi On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Is that month= syntax something special, or do your files actually have that string as part of their name? On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi all, Thanks for the reply. I'm using parquetFile as input, is that a problem? In hadoop fs -ls, the path (hdfs://domain/user/ jianshuang/data/parquet/table/month=2014*) will get list all the files. I'll test it again. Jianshi On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Andrew, Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says file not found. I'll try again. Jianshi On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote: In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Wildcard support in input path
I wonder if that’s the problem. Is there an equivalent hadoop fs -ls command you can run that returns the same files you want but doesn’t have that month= string? On Wed, Jun 18, 2014 at 12:25 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Nicholas, month= is for Hive to auto discover the partitions. It's part of the url of my files. Jianshi On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Is that month= syntax something special, or do your files actually have that string as part of their name? On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi all, Thanks for the reply. I'm using parquetFile as input, is that a problem? In hadoop fs -ls, the path (hdfs://domain/user/ jianshuang/data/parquet/table/month=2014*) will get list all the files. I'll test it again. Jianshi On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi Andrew, Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says file not found. I'll try again. Jianshi On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash and...@andrewash.com wrote: In Spark you can use the normal globs supported by Hadoop's FileSystem, which are documented here: http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path) On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Jianshi, I have used wild card characters (*) in my program and it worked.. My code was like this b = sc.textFile(hdfs:///path to file/data_file_2013SEP01*) Thanks Regards, Meethu M On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang jianshi.hu...@gmail.com wrote: It would be convenient if Spark's textFile, parquetFile, etc. can support path with wildcard, such as: hdfs://domain/user/jianshuang/data/parquet/table/month=2014* Or is there already a way to do it now? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode
Hi All, Have anyone ran into the same problem? By looking at the source code in official release (rc11),this property settings is set to false by default, however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it to fill up the disk pretty fast since SparkContext deploys the fat JAR file (~115MB) every time for each job and it is not cleaned up. yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala: val preserveFiles = sparkConf.get(spark.yarn.preserve.staging.files, false).toBoolean [test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx-- - test users 0 2014-05-01 01:42 .sparkStaging/application_1398370455828_0050drwx-- - test users 0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx-- - test users 0 2014-05-01 02:04 .sparkStaging/application_1398370455828_0052drwx-- - test users 0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx-- - test users 0 2014-05-01 05:45 .sparkStaging/application_1398370455828_0055drwx-- - test users 0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx-- - test users 0 2014-05-01 05:49 .sparkStaging/application_1398370455828_0057drwx-- - test users 0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx-- - test users 0 2014-05-01 05:58 .sparkStaging/application_1398370455828_0059drwx-- - test users 0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx-- - test users 0 2014-05-01 07:41 .sparkStaging/application_1398370455828_0061….drwx-- - test users 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx-- - test users 0 2014-06-16 15:03 .sparkStaging/application_1402001910637_0135drwx-- - test users 0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx-- - test users 0 2014-06-16 15:46 .sparkStaging/application_1402001910637_0138drwx-- - test users 0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx-- - test users 0 2014-06-17 05:55 .sparkStaging/application_1402001910637_0161 Is this something that needs to be explicitly set in :SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false http://spark.apache.org/docs/latest/running-on-yarn.htmlspark.yarn.preserve.staging.filesfalseSet to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather then delete them.or this is a bug that is not honoring the default value and is override to true somewhere? Thanks.
RE: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode
Forgot to mention that I am using spark-submit to submit jobs, and a verbose mode print out looks like this with the SparkPi examples.The .sparkStaging won't be deleted. My thoughts is that this should be part of the staging and should be cleaned up as well when sc gets terminated. [test@ spark]$ SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar ./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi --driver-memory 512M --driver-library-path /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --executor-memory 512M --executor-cores 1 --queue research --num-executors 2 examples/target/spark-examples_2.10-1.0.0.jar Using properties file: null Using properties file: null Parsed arguments: master yarn deployMode cluster executorMemory 512M executorCores 1 totalExecutorCores null propertiesFile null driverMemory512M driverCores null driverExtraClassPathnull driverExtraLibraryPath /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar driverExtraJavaOptions null supervise false queue research numExecutors2 files null pyFiles null archivesnull mainClass org.apache.spark.examples.SparkPi primaryResource file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar nameorg.apache.spark.examples.SparkPi childArgs [] jarsnull verbose true Default properties from null: Using properties file: null Main class: org.apache.spark.deploy.yarn.Client Arguments: --jar file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar --class org.apache.spark.examples.SparkPi --name org.apache.spark.examples.SparkPi --driver-memory 512M --queue research --num-executors 2 --executor-memory 512M --executor-cores 1 System properties: spark.driver.extraLibraryPath - /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar SPARK_SUBMIT - true spark.app.name - org.apache.spark.examples.SparkPi Classpath elements: From: alee...@hotmail.com To: user@spark.apache.org Subject: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode Date: Wed, 18 Jun 2014 11:05:12 -0700 Hi All, Have anyone ran into the same problem? By looking at the source code in official release (rc11),this property settings is set to false by default, however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it to fill up the disk pretty fast since SparkContext deploys the fat JAR file (~115MB) every time for each job and it is not cleaned up. yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala: val preserveFiles = sparkConf.get(spark.yarn.preserve.staging.files, false).toBoolean [test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx-- - test users 0 2014-05-01 01:42 .sparkStaging/application_1398370455828_0050drwx-- - test users 0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx-- - test users 0 2014-05-01 02:04 .sparkStaging/application_1398370455828_0052drwx-- - test users 0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx-- - test users 0 2014-05-01 05:45 .sparkStaging/application_1398370455828_0055drwx-- - test users 0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx-- - test users 0 2014-05-01 05:49 .sparkStaging/application_1398370455828_0057drwx-- - test users 0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx-- - test users 0 2014-05-01 05:58 .sparkStaging/application_1398370455828_0059drwx-- - test users 0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx-- - test users 0 2014-05-01 07:41 .sparkStaging/application_1398370455828_0061….drwx-- - test users 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx-- - test users 0 2014-06-16 15:03 .sparkStaging/application_1402001910637_0135drwx-- - test users 0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx-- - test users 0 2014-06-16 15:46 .sparkStaging/application_1402001910637_0138drwx-- - test users 0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx-- - test users 0 2014-06-17 05:55 .sparkStaging/application_1402001910637_0161 Is this something that needs to be explicitly set in :SPARK_YARN_USER_ENV=spark.yarn.preserve.staging.files=false
Re: question about setting SPARK_CLASSPATH IN spark_env.sh
by the way, any idea how to sync the spark config dir with other nodes in the cluster? ~santhosh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7853.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Unit test failure: Address already in use
Disabling parallelExecution has worked for me. Other alternatives I’ve tried that also work include: 1. Using a lock – this will let tests execute in parallel except for those using a SparkContext. If you have a large number of tests that could execute in parallel, this can shave off some time. object TestingSparkContext { val lock = new Lock() } // before you instantiate your local SparkContext TestingSparkContext.lock.acquire() // after you call sc.stop() TestingSparkContext.lock.release() 2. Sharing a local SparkContext between tests. - This is nice because your tests will run faster. Start-up and shutdown is time consuming (can add a few seconds per test). - The downside is that your tests are using the same SparkContext so they are less independent of each other. I haven’t seen issues with this yet but there are likely some things that might crop up. Best, Todd From: Anselme Vignon [mailto:anselme.vig...@flaminem.com] Sent: Wednesday, June 18, 2014 12:33 AM To: user@spark.apache.org Subject: Re: Unit test failure: Address already in use Hi, Could your problem come from the fact that you run your tests in parallel ? If you are spark in local mode, you cannot have concurrent spark instances running. this means that your tests instantiating sparkContext cannot be run in parallel. The easiest fix is to tell sbt to not run parallel tests. This can be done by adding the following line in your build.sbt: parallelExecution in Test := false Cheers, Anselme 2014-06-17 23:01 GMT+02:00 SK skrishna...@gmail.commailto:skrishna...@gmail.com: Hi, I have 3 unit tests (independent of each other) in the /src/test/scala folder. When I run each of them individually using: sbt test-only test, all the 3 pass the test. But when I run them all using sbt test, then they fail with the warning below. I am wondering if the binding exception results in failure to run the job, thereby causing the failure. If so, what can I do to address this binding exception? I am running these tests locally on a standalone machine (i.e. SparkContext(local, test)). 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3487b78dmailto:org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:174) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Unit test failure: Address already in use
In my unit tests I have a base class that all my tests extend that has a setup and teardown method that they inherit. They look something like this: var spark: SparkContext = _ @Before def setUp() { Thread.sleep(100L) //this seems to give spark more time to reset from the previous test's tearDown spark = new SparkContext(local, test spark) } @After def tearDown() { spark.stop spark = null //not sure why this helps but it does! System.clearProperty(spark.master.port) } It's been since last fall (i.e. version 0.8.x) since I've examined this code and so I can't vouch that it is still accurate/necessary - but it still works for me. On 06/18/2014 12:59 PM, Lisonbee, Todd wrote: Disabling parallelExecution has worked for me. Other alternatives I’ve tried that also work include: 1. Using a lock – this will let tests execute in parallel except for those using a SparkContext. If you have a large number of tests that could execute in parallel, this can shave off some time. object TestingSparkContext { val lock = new Lock() } // before you instantiate your local SparkContext TestingSparkContext.lock.acquire() // after you call sc.stop() TestingSparkContext.lock.release() 2. Sharing a local SparkContext between tests. - This is nice because your tests will run faster. Start-up and shutdown is time consuming (can add a few seconds per test). - The downside is that your tests are using the same SparkContext so they are less independent of each other. I haven’t seen issues with this yet but there are likely some things that might crop up. Best, Todd *From:*Anselme Vignon [mailto:anselme.vig...@flaminem.com] *Sent:* Wednesday, June 18, 2014 12:33 AM *To:* user@spark.apache.org *Subject:* Re: Unit test failure: Address already in use Hi, Could your problem come from the fact that you run your tests in parallel ? If you are spark in local mode, you cannot have concurrent spark instances running. this means that your tests instantiating sparkContext cannot be run in parallel. The easiest fix is to tell sbt to not run parallel tests. This can be done by adding the following line in your build.sbt: parallelExecution in Test := false Cheers, Anselme 2014-06-17 23:01 GMT+02:00 SK skrishna...@gmail.com mailto:skrishna...@gmail.com: Hi, I have 3 unit tests (independent of each other) in the /src/test/scala folder. When I run each of them individually using: sbt test-only test, all the 3 pass the test. But when I run them all using sbt test, then they fail with the warning below. I am wondering if the binding exception results in failure to run the job, thereby causing the failure. If so, what can I do to address this binding exception? I am running these tests locally on a standalone machine (i.e. SparkContext(local, test)). 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED org.eclipse.jetty.server.Server@3487b78d mailto:org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:174) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77) thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark is now available via Homebrew
OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I’m new to Homebrew, so I’m not too sure how people are intended to use this. I’m guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: No Intercept for Python
Hi Naftali, Yes you're right. For now please add a column of ones. We are working on adding a weighted regularization term, and exposing the scala intercept option in the python binding. Best, Reza On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris naft...@affirm.com wrote: Hi everyone, The Python LogisticRegressionWithSGD does not appear to estimate an intercept. When I run the following, the returned weights and intercept are both 0.0: from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithSGD def main(): sc = SparkContext(appName=NoIntercept) train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]), LabeledPoint(1, [0])]) model = LogisticRegressionWithSGD.train(train, iterations=500, step=0.1) print Final weights: + str(model.weights) print Final intercept: + str(model.intercept) if __name__ == __main__: main() Of course, one can fit an intercept with the simple expedient of adding a column of ones, but that's kind of annoying. Moreover, it looks like the scala version has an intercept option. Am I missing something? Should I just add the column of ones? If I submitted a PR doing that, is that the sort of thing you guys would accept? Thanks! :-) Naftali
Re: Spark is now available via Homebrew
Interesting, does anyone know the people over there who set it up? It would be good if Apache itself could publish packages there, though I’m not sure what’s involved. Since Spark just depends on Java and Python it should be easy for us to update. Matei On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote: OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I’m new to Homebrew, so I’m not too sure how people are intended to use this. I’m guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick View this message in context: Spark is now available via Homebrew Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: No Intercept for Python
Thanks Reza! :-D Naftali On Wed, Jun 18, 2014 at 1:47 PM, Reza Zadeh r...@databricks.com wrote: Hi Naftali, Yes you're right. For now please add a column of ones. We are working on adding a weighted regularization term, and exposing the scala intercept option in the python binding. Best, Reza On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris naft...@affirm.com wrote: Hi everyone, The Python LogisticRegressionWithSGD does not appear to estimate an intercept. When I run the following, the returned weights and intercept are both 0.0: from pyspark import SparkContext from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithSGD def main(): sc = SparkContext(appName=NoIntercept) train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]), LabeledPoint(1, [0])]) model = LogisticRegressionWithSGD.train(train, iterations=500, step=0.1) print Final weights: + str(model.weights) print Final intercept: + str(model.intercept) if __name__ == __main__: main() Of course, one can fit an intercept with the simple expedient of adding a column of ones, but that's kind of annoying. Moreover, it looks like the scala version has an intercept option. Am I missing something? Should I just add the column of ones? If I submitted a PR doing that, is that the sort of thing you guys would accept? Thanks! :-) Naftali
Re: Spark is now available via Homebrew
Cool. Looked at the Pull Requests, the upgrade to 1.0.0 was just merged yesterday. https://github.com/Homebrew/homebrew/pull/30231 https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Interesting, does anyone know the people over there who set it up? It would be good if Apache itself could publish packages there, though I'm not sure what's involved. Since Spark just depends on Java and Python it should be easy for us to update. Matei On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote: OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I'm new to Homebrew, so I'm not too sure how people are intended to use this. I'm guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick -- View this message in context: Spark is now available via Homebrew http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- -Sheryl
Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) Sadly, there are several folks that have faced this error while trying to execute Spark jobs and there are various solutions, none of which work for me a) I tried ( http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736) changing the number of partitions in my RDD by using coalesce(8) and the error persisted b) I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and both did not work c) I strongly suspect there is a class path error ( http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html) Mainly because the call stack is repetitive. Maybe the OOM error is a disguise ? d) I checked that i am not out of disk space and that i do not have too many open files (ulimit -u sudo ls /proc/spark_master_process_id/fd | wc -l) I am also noticing multiple reflections happening to find the right class i guess, so it could be class Not Found: error disguising itself as a memory error. Here are other threads that are encountering same situation .. but have not been resolved in any way so far.. http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html Any help is greatly appreciated. I am especially calling out on creators of Spark and Databrick folks. This seems like a known bug waiting to happen. Thanks, Shivani -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: Spark is now available via Homebrew
Agreed, it would be better if Apache controlled or managed this directly. I think making such a change is just a matter of opening a new issue https://github.com/Homebrew/homebrew/issues/new on the Homebrew issue tracker. I believe that's how Spark made it in there in the first place--it was just a user contribution. Nick On Wed, Jun 18, 2014 at 4:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Interesting, does anyone know the people over there who set it up? It would be good if Apache itself could publish packages there, though I’m not sure what’s involved. Since Spark just depends on Java and Python it should be easy for us to update. Matei On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote: OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I’m new to Homebrew, so I’m not too sure how people are intended to use this. I’m guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick -- View this message in context: Spark is now available via Homebrew http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Spark is now available via Homebrew
Matei, You might want to comment on that issue Sherl linked to, or perhaps this one https://github.com/Homebrew/homebrew/issues/30228, to ask about how Apache can manage this going forward. I know that mikemcquaid https://github.com/mikemcquaid is very active on the Homebrew repo and is one of the maintainers. Nick On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John shery...@gmail.com wrote: Cool. Looked at the Pull Requests, the upgrade to 1.0.0 was just merged yesterday. https://github.com/Homebrew/homebrew/pull/30231 https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Interesting, does anyone know the people over there who set it up? It would be good if Apache itself could publish packages there, though I’m not sure what’s involved. Since Spark just depends on Java and Python it should be easy for us to update. Matei On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote: OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I’m new to Homebrew, so I’m not too sure how people are intended to use this. I’m guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick -- View this message in context: Spark is now available via Homebrew http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- -Sheryl
Re: Spark is now available via Homebrew
What's the advantage of Apache maintaining the brew installer vs users? Apache handling it means more work on this dev team, but probably a better experience for brew users. Just wanted to weigh pros/cons before committing to support this installation method. Andrew On Wed, Jun 18, 2014 at 5:29 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Matei, You might want to comment on that issue Sherl linked to, or perhaps this one https://github.com/Homebrew/homebrew/issues/30228, to ask about how Apache can manage this going forward. I know that mikemcquaid https://github.com/mikemcquaid is very active on the Homebrew repo and is one of the maintainers. Nick On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John shery...@gmail.com wrote: Cool. Looked at the Pull Requests, the upgrade to 1.0.0 was just merged yesterday. https://github.com/Homebrew/homebrew/pull/30231 https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Interesting, does anyone know the people over there who set it up? It would be good if Apache itself could publish packages there, though I’m not sure what’s involved. Since Spark just depends on Java and Python it should be easy for us to update. Matei On Jun 18, 2014, at 1:37 PM, Nick Chammas nicholas.cham...@gmail.com wrote: OS X / Homebrew users, It looks like you can now download Spark simply by doing: brew install apache-spark I’m new to Homebrew, so I’m not too sure how people are intended to use this. I’m guessing this would just be a convenient way to get the latest release onto your workstation, and from there use spark-ec2 to launch clusters. Anyway, just a cool thing to point out. Nick -- View this message in context: Spark is now available via Homebrew http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com. -- -Sheryl
Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space
Wait, so the file only has four lines and the job running out of heap space? Can you share the code you're running that does the processing? I'd guess that you're doing some intense processing on every line but just writing parsed case classes back to disk sounds very lightweight. I On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao raoshiv...@gmail.com wrote: I am trying to process a file that contains 4 log lines (not very long) and then write my parsed out case classes to a destination folder, and I get the following error: java.lang.OutOfMemoryError: Java heap space at org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183) at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) Sadly, there are several folks that have faced this error while trying to execute Spark jobs and there are various solutions, none of which work for me a) I tried ( http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736) changing the number of partitions in my RDD by using coalesce(8) and the error persisted b) I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and both did not work c) I strongly suspect there is a class path error ( http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html) Mainly because the call stack is repetitive. Maybe the OOM error is a disguise ? d) I checked that i am not out of disk space and that i do not have too many open files (ulimit -u sudo ls /proc/spark_master_process_id/fd | wc -l) I am also noticing multiple reflections happening to find the right class i guess, so it could be class Not Found: error disguising itself as a memory error. Here are other threads that are encountering same situation .. but have not been resolved in any way so far.. http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html Any help is greatly appreciated. I am especially calling out on creators of Spark and Databrick folks. This seems like a known bug waiting to happen. Thanks, Shivani -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
java.lang.OutOfMemoryError with saveAsTextFile
Hi, I have a 5 million record, 300 column data set. I am running a spark job in yarn-cluster mode, with the following args --driver-memory 11G --executor-memory 11G --executor-cores 16 --num-executors 500 The spark job replaces all categorical variables with some integers. I am getting the below error when I try to save the transformed data set. java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit exceeded) java.util.Arrays.copyOfRange(Arrays.java:3209) java.lang.String.init(String.java:215) java.lang.StringBuilder.toString(StringBuilder.java:430) java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023) java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819) java.io.ObjectInputStream.readString(ObjectInputStream.java:1598) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125) org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30) org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176) scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45) scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257) scala.collection.AbstractIterator.toList(Iterator.scala:1157) scala.collection.immutable.List.$plus$plus(List.scala:193) DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137) DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137) The code is as follows: val transformedData = splitFileWithHeader.flatMap(rowArray = { try { if (rowArray.sameElements(header.value)) { None } else { val transformedArray: Array[String] = new Array[String](rowArray.length) for (i - 0 until rowArray.length) { // Check 1 to see if the value should be replaced, Check 2 to see if its a null value in which case, we do not update the value if (broadcastReplacements.value(i) != null rowArray(i).trim.toString != ) { transformedArray.update(i, broadcastReplacements.value(i)(rowArray(i).trim.toString).toString) } else { transformedArray.update(i, rowArray(i).trim.toString) } } Array(transformedArray.deep.mkString(,)) } } catch { case _: Throwable = { println(Failure in transforming the file, 1 line, Around Line 131) None } } }).coalesce(1, true).mapPartitions( it = (Seq(headerLine.value) ++ it).iterator,true).coalesce(500) //Save the Transformed Data File transformedData.saveAsTextFile(outputFileLocation) Any idea how I can resolve this error? Previous stages have completed successfully. Thank You! Vinay Prior Stages val dataFile = sc.textFile(args(1),500) //Get the first line which is the header, which would also contain the column type val columnDefinition = dataFile.first val headerLine = sc.broadcast(columnDefinition) val header = sc.broadcast(columnDefinition.split(,,-1)) // Remove the Header val modifiedDataFile = dataFile.filter(line = line != headerLine.value) val onlySplitFile = modifiedDataFile.flatMap(line = { try { //println(line.split(' ').length) //println(line.split(' ')) if (line.split(',').length 1 || line.split(',').sameElements(Array())) { None } else { Array(line.split(,,-1)) } } catch { case _: Throwable = None } }) modifiedDataFile.unpersist(true) val currentColumn = sc.broadcast(i) val distinctValues = onlySplitFile.flatMap(rowArray = { try {
Spark streaming and rate limit
Hi to all, in my use case I'd like to receive events and call an external service as they pass through. Is it possible to limit the number of contemporaneous call to that service (to avoid DoS) using Spark streaming? if so, limiting the rate implies a possible buffer growth...how can I control the buffer of incoming events waiting to be processed? Best, Flavio
Re: Issue while trying to aggregate with a sliding window
Ok that patch does fix the key lookup exception. However, curious about the time validity check..isValidTime ( https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264 ) Why does (time - zerotime) have to be a multiple of slide duration ? Shouldn't the reduceByKeyAndWindow aggregate every record in a given window (zeroTime to zeroTime+windowDuration)? On Tue, Jun 17, 2014 at 10:55 PM, Hatch M hatchman1...@gmail.com wrote: Thanks! Will try to get the fix and retest. On Tue, Jun 17, 2014 at 5:30 PM, onpoq l onpo...@gmail.com wrote: There is a bug: https://github.com/apache/spark/pull/961#issuecomment-45125185 On Tue, Jun 17, 2014 at 8:19 PM, Hatch M hatchman1...@gmail.com wrote: Trying to aggregate over a sliding window, playing with the slide duration. Playing around with the slide interval I can see the aggregation works but mostly fails with the below error. The stream has records coming in at 100ms. JavaPairDStreamString, AggregateObject aggregatedDStream = pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new Duration(60)); 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms and difference is 1100 ms 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found: 1403050486900 ms java.util.NoSuchElementException: key not found: 1403050486900 ms at scala.collection.MapLike$class.default(MapLike.scala:228) Any hints on whats going on here? Thanks! Hatch
Re: Spark streaming and rate limit
You can add a back pressured enabled component in front that feeds data into Spark. This component can control in input rate to spark. On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, in my use case I'd like to receive events and call an external service as they pass through. Is it possible to limit the number of contemporaneous call to that service (to avoid DoS) using Spark streaming? if so, limiting the rate implies a possible buffer growth...how can I control the buffer of incoming events waiting to be processed? Best, Flavio
Re: Spark streaming and rate limit
Thanks for the quick reply soumya. Unfortunately I'm a newbie with Spark..what do you mean? is there any reference to how to do that? On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta soumya.sima...@gmail.com wrote: You can add a back pressured enabled component in front that feeds data into Spark. This component can control in input rate to spark. On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, in my use case I'd like to receive events and call an external service as they pass through. Is it possible to limit the number of contemporaneous call to that service (to avoid DoS) using Spark streaming? if so, limiting the rate implies a possible buffer growth...how can I control the buffer of incoming events waiting to be processed? Best, Flavio
create SparkContext dynamically
Hi all, I am setting up a system where spark contexts would be created by a web server that would handle the computation and return the results. I have the following code (in python) os.environ['SPARK_HOME'] = /home/spark/spark-1.0.0-bin-hadoop2/ sc = SparkContext(master=spark://ip-xx-xx-xx-xx:7077, appName=Simple App) l =sc.parallelize([1,2,3,4]) c = l.count() but it throws an unrelated error 'TypeError: an integer is required' in the last line. I assume I did not setup the environment properly. I have added spark_home and py4j source to the classpath. not sure what is missing. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/create-SparkContext-dynamically-tp7872.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Trailing Tasks Saving to HDFS
I have a flow that ends with saveAsTextFile() to HDFS. It seems all the expected files per partition have been written out, based on the number of part files and the file sizes. But the driver logs show 2 tasks still not completed and has no activity and the worker logs show no activity for those two tasks for a while now. Has anyone run into this situation? It's happened to me a couple of times now. Thanks. -- Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Patterns for making multiple aggregations in one pass
The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: Patterns for making multiple aggregations in one pass http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
Ah, this looks like exactly what I need! It looks like this was recently added into PySpark https://github.com/apache/spark/pull/705/files#diff-6 (and Spark Core), but it's not in the 1.0.0 release. Thank you. Nick On Wed, Jun 18, 2014 at 7:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: Patterns for making multiple aggregations in one pass http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: Patterns for making multiple aggregations in one pass http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
I was going to suggest the same thing :). On Jun 18, 2014, at 4:56 PM, Evan R. Sparks evan.spa...@gmail.com wrote: This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224 } { country: USA, name: Bob, age: 55, hits: 108 } { country: France, name: Remi, age: 33, hits: 72 } I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick View this message in context: Patterns for making multiple aggregations in one pass Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
That’s pretty neat! So I guess if you start with an RDD of objects, you’d first do something like RDD.map(lambda x: Record(x['field_1'], x['field_2'], ...)) in order to register it as a table, and from there run your aggregates. Very nice. On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com wrote: This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224} { country: USA, name: Bob, age: 55, hits: 108} { country: France, name: Remi, age: 33, hits: 72} I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick -- View this message in context: Patterns for making multiple aggregations in one pass http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html Sent from the Apache Spark User List mailing list archive http://apache-spark-user-list.1001560.n3.nabble.com/ at Nabble.com.
Re: Patterns for making multiple aggregations in one pass
If your input data is JSON, you can also try out the recently merged in initial JSON support: https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916 On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: That’s pretty neat! So I guess if you start with an RDD of objects, you’d first do something like RDD.map(lambda x: Record(x['field_1'], x['field_2'], ...)) in order to register it as a table, and from there run your aggregates. Very nice. On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com wrote: This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224 } { country: USA, name: Bob, age: 55, hits: 108 } { country: France, name: Remi, age: 33, hits: 72 } I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick View this message in context: Patterns for making multiple aggregations in one pass Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Trailing Tasks Saving to HDFS
Looks like eventually there was some type of reset or timeout and the tasks have been reassigned. I'm guessing they'll keep failing until max failure count. The machine it disconnected from was a remote machine, though I've seen such failures from connections to itself with other problems. The log lines from the remote machine are also below. Any thoughts or guesses would be appreciated! *HUNG WORKER* 14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from connection to ConnectionManagerId(172.16.25.103,57626) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcher.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251) at sun.nio.ch.IOUtil.read(IOUtil.java:224) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254) at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496) at org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:679) 14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error on connection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.103,57626) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found *REMOTE WORKER* 14/06/18 19:41:18 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(172.16.25.124,55610) 14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding SendingConnectionManagerId not found On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: I have a flow that ends with saveAsTextFile() to HDFS. It seems all the expected files per partition have been written out, based on the number of part files and the file sizes. But the driver logs show 2 tasks still not completed and has no activity and the worker logs show no activity for those two tasks for a while now. Has anyone run into this situation? It's happened to me a couple of times now. Thanks. -- Suren SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Patterns for making multiple aggregations in one pass
This is exciting! Here is the relevant alpha doc http://yhuai.github.io/site/sql-programming-guide.html#json-datasets for this feature, for others reading this. I'm going to try this out. Will this be released with 1.1.0? On Wed, Jun 18, 2014 at 8:31 PM, Zongheng Yang zonghen...@gmail.com wrote: If your input data is JSON, you can also try out the recently merged in initial JSON support: https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916 On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: That’s pretty neat! So I guess if you start with an RDD of objects, you’d first do something like RDD.map(lambda x: Record(x['field_1'], x['field_2'], ...)) in order to register it as a table, and from there run your aggregates. Very nice. On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks evan.spa...@gmail.com wrote: This looks like a job for SparkSQL! val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ case class MyRecord(country: String, name: String, age: Int, hits: Long) val data = sc.parallelize(Array(MyRecord(USA, Franklin, 24, 234), MyRecord(USA, Bob, 55, 108), MyRecord(France, Remi, 33, 72))) data.registerAsTable(MyRecords) val results = sql(SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords t GROUP BY t.country).collect Now results contains: Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342]) On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin doris.s@gmail.com wrote: Hi Nick, Instead of using reduceByKey(), you might want to look into using aggregateByKey(), which allows you to return a different value type U instead of the input value type V for each input tuple (K, V). You can define U to be a datatype that holds both the average and total and have seqOp update both fields of U in a single pass. Hope this makes sense, Doris On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas nicholas.cham...@gmail.com wrote: The following is a simplified example of what I am trying to accomplish. Say I have an RDD of objects like this: { country: USA, name: Franklin, age: 24, hits: 224 } { country: USA, name: Bob, age: 55, hits: 108 } { country: France, name: Remi, age: 33, hits: 72 } I want to find the average age and total number of hits per country. Ideally, I would like to scan the data once and perform both aggregations simultaneously. What is a good approach to doing this? I’m thinking that we’d want to keyBy(country), and then somehow reduceByKey(). The problem is, I don’t know how to approach writing a function that can be passed to reduceByKey() and that will track a running average and total simultaneously. Nick View this message in context: Patterns for making multiple aggregations in one pass Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark streaming and rate limit
Flavio - i'm new to Spark as well but I've done stream processing using other frameworks. My comments below are not spark-streaming specific. Maybe someone who know more can provide better insights. I read your post on my phone and I believe my answer doesn't completely address the issue you have raised. Do you need to call the external service for every event ? i.e., do you need to process all events ? Also does order of processing events matter? Is there is time bound in which each event should be processed ? Calling an external service means network IO. So you have to buffer events if your service is rate limited or slower than rate at which you are processing your event. Here are some ways of dealing with this situation: 1. Drop events based on a policy (such as buffer/queue size), 2. Tell the event producer to slow down if that's in your control 3. Use a proxy or a set of proxies to distribute the calls to the remote service, if the rate limit is by user or network node only. I'm not sure how many of these are implemented directly in Spark streaming but you can have an external component that can : control the rate of event and only send events to Spark streams when it's ready to process more messages. Hope this helps. -Soumya On Wed, Jun 18, 2014 at 6:50 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Thanks for the quick reply soumya. Unfortunately I'm a newbie with Spark..what do you mean? is there any reference to how to do that? On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta soumya.sima...@gmail.com wrote: You can add a back pressured enabled component in front that feeds data into Spark. This component can control in input rate to spark. On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier pomperma...@okkam.it wrote: Hi to all, in my use case I'd like to receive events and call an external service as they pass through. Is it possible to limit the number of contemporaneous call to that service (to avoid DoS) using Spark streaming? if so, limiting the rate implies a possible buffer growth...how can I control the buffer of incoming events waiting to be processed? Best, Flavio
Re: Execution stalls in LogisticRegressionWithSGD
Hi Bharath, This is related to SPARK-1112, which we already found the root cause. I will let you know when this is fixed. Best, Xiangrui On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser TimeErrors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 14 ms 10 610 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 11 611 SUCCESS PROCESS_LOCAL
Re: Execution stalls in LogisticRegressionWithSGD
Thanks. I'll await the fix to re-run my test. On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, This is related to SPARK-1112, which we already found the root cause. I will let you know when this is fixed. Best, Xiangrui On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Couple more points: 1)The inexplicable stalling of execution with large feature sets appears similar to that reported with the news-20 dataset: http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E 2) The NPE trying to call mapToPair convert an RDDLong, Long, Integer, Integer into a JavaPairRDDTuple2Long,Long, Tuple2Integer,Integer is unrelated to mllib. Thanks, Bharath On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi Xiangrui , I'm using 1.0.0. Thanks, Bharath On 18-Jun-2014 1:43 am, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, Thanks for posting the details! Which Spark version are you using? Best, Xiangrui On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Hi, (Apologies for the long mail, but it's necessary to provide sufficient details considering the number of issues faced.) I'm running into issues testing LogisticRegressionWithSGD a two node cluster (each node with 24 cores and 16G available to slaves out of 24G on the system). Here's a description of the application: The model is being trained based on categorical features x, y, and (x,y). The categorical features are mapped to binary features by converting each distinct value in the category enum into a binary feature by itself (i.e presence of that value in a record implies corresponding feature = 1, else feature = 0. So, there'd be as many distinct features as enum values) . The training vector is laid out as [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the training data has only one combination (Xk,Yk) and a label appearing in the record. Thus, the corresponding labeledpoint sparse vector would only have 3 values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector (though parse) would be nearly 614000. The number of records is about 1.33 million. The records have been coalesced into 20 partitions across two nodes. The input data has not been cached. (NOTE: I do realize the records features may seem large for a two node setup, but given the memory cpu, and the fact that I'm willing to give up some turnaround time, I don't see why tasks should inexplicably fail) Additional parameters include: spark.executor.memory = 14G spark.default.parallelism = 1 spark.cores.max=20 spark.storage.memoryFraction=0.8 //No cache space required (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't help either) The model training was initialized as : new LogisticRegressionWithSGD(1, maxIterations, 0.0, 0.05) However, after 4 iterations of gradient descent, the entire execution appeared to stall inexplicably. The corresponding executor details and details of the stalled stage (number 14) are as follows: MetricMin25th Median75th Max Result serialization time12 ms13 ms14 ms16 ms18 ms Duration4 s4 s5 s5 s 5 s Time spent fetching task 0 ms0 ms0 ms0 ms0 ms results Scheduler delay6 s6 s6 s6 s 12 s Stage Id 14 aggregate at GradientDescent.scala:178 Task IndexTask IDStatusLocality Level Executor Launch TimeDurationGC Result Ser Time Errors Time 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 1.1 h 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 1 s 14 ms 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com 2014/06/17 10:32:27 4 s 2 s 12 ms 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com 2014/06/17 10:32:27 5 s 1 s 15 ms 9 609 SUCCESS
Fwd: BSP realization on Spark
-- Forwarded message -- From: Ghousia ghousia.ath...@gmail.com Date: Wed, Jun 18, 2014 at 5:41 PM Subject: BSP realization on Spark To: user@spark.apache.org Hi, We are trying to implement a BSP model in Spark with the help of GraphX. One thing I encountered is a Pregel operator in Graph class. But what I fail to understand is how the Master and Worker needs to be assigned (BSP), and how barrier synchronization would happen. The pregel operator provides a way to define a vertex program, but nothing is mentioned about the barrier synchronization. Any help in this regard is truly appreciated. Many Thanks, Ghousia.
options set in spark-env.sh is not reflecting on actual execution
Hi all, I have a doubt regarding the options in spark-env.sh. I set the following values in the file in master and 2 workers SPARK_WORKER_MEMORY=7g SPARK_EXECUTOR_MEMORY=6g SPARK_DAEMON_JAVA_OPTS+=- Dspark.akka.timeout=30 -Dspark.akka.frameSize=1 -Dspark.blockManagerHeartBeatMs=80 -Dspark.shuffle.spill=false But SPARK_EXECUTOR_MEMORY is showing 4g in web UI.Do I need to change it anywhere else to make it 4g and to reflect it in web UI. A warning is coming that blockManagerHeartBeatMs is exceeding 45 while executing a process even though I set it to 80. So I doubt whether it should be set as SPARK_MASTER_OPTS or SPARK_WORKER_OPTS.. Thanks Regards, Meethu M
Re: Best practices for removing lineage of a RDD or Graph object?
Hi Roy, Thanks for your help, I write a small code snippet that could reproduce the problem. Could you help me read through it and see if I did anything wrong? Thanks! def main(args: Array[String]) { val conf = new SparkConf().setAppName(“TEST) .setMaster(local[4]) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, edu.nd.dsg.hdtm.util.HDTMKryoRegistrator) val sc = new SparkContext(conf) val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L))) val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), Edge(2L, 0L, 2L))) val newGraph = Graph(v, e) var currentGraph = newGraph val vertexIds = currentGraph.vertices.map(_._1).collect() for (i - 1 to 1000) { var g = currentGraph vertexIds.toStream.foreach(id = { g = Graph(currentGraph.vertices, currentGraph.edges) g.cache() g.edges.cache() g.vertices.cache() g.vertices.count() g.edges.count() }) currentGraph.unpersistVertices(blocking = false) currentGraph.edges.unpersist(blocking = false) currentGraph = g println( iter +i+ finished) } } Baoxu Shi(Dash) Computer Science and Engineering Department University of Notre Dame b...@nd.edu On Jun 19, 2014, at 1:47 AM, roy20021 [via Apache Spark User List] ml-node+s1001560n7892...@n3.nabble.com wrote: No sure if it can help, btw: Checkpoint cuts the lineage. The checkpoint method is a flag. In order to actually perform the checkpoint you must do NOT materialise the RDD before it has been flagged otherwise the flag is just ignored. rdd2 = rdd1.map(..) rdd2.checkpoint() rdd2.count rdd2.isCheckpointed // true Il mercoledì 18 giugno 2014, dash [hidden email] ha scritto: If a RDD object have non-empty .dependencies, does that means it have lineage? How could I remove it? I'm doing iterative computing and each iteration depends on the result computed in previous iteration. After several iteration, it will throw StackOverflowError. At first I'm trying to use cache, I read the code in pregel.scala, which is part of GraphX, they use a count method to materialize the object after cache, but I attached a debugger and seems such approach does not empty .dependencies, and that also does not work in my code. Another alternative approach is using checkpoint, I tried checkpoint vertices and edges for my Graph object and then materialize it by count vertices and edges. Then I use .isCheckpointed to check if it is correctly checkpointed, but it always return false. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html Sent from the Apache Spark User List mailing list archive at Nabble.com. If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7892.html To unsubscribe from Best practices for removing lineage of a RDD or Graph object?, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7893.html Sent from the Apache Spark User List mailing list archive at Nabble.com.