RE: Issue when rebroadcasting a variable outside of the definition scope
Simone, here are some thoughts. Please check out the understanding closures section of the Spark Programming Guide. Secondly, broadcast variables do not propagate updates to the underlying data. You must either create a new broadcast variable or alternately if you simply wish to accumulate results you can use an Accumulator that stores an array or queue as a buffer that you then read from to Kafka. You should also be able to send the results to a new DStream instead, and link that DStream to Kafka. Hope this gives you some ideas to play with. Thanks! Thank you, Ilya Ganelin -Original Message- From: simone.robutti [simone.robu...@gmail.commailto:simone.robu...@gmail.com] Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time To: user@spark.apache.org Subject: Issue when rebroadcasting a variable outside of the definition scope Hello everyone, this is my first message ever to a mailing list so please pardon me if for some reason I'm violating the etiquette. I have a problem with rebroadcasting a variable. How it should work is not well documented so I could find only a few and simple example to understand how it should work. What I'm trying to do is to propagate an update to the option for the behaviour of my streaming transformations (in this case, the evaluation of machine learning models). I have a listener on a kafka queue that wait for messages and update the broadcasted variable. I made it to work but the system doesn't rebroadcast anything if I pass the DStream or the broadcasted variable as a parameter. So they must be defined both in the same scope and the rebroadcasting should happen again in the same scope. Right now my main function looks like this: -- var updateVar= sc.broadcast(test) val stream=input.map(x = myTransformation(x,updateVar)) stream.writeToKafka[String, String](outputProps, (m: String) = new KeyedMessage[String, String](configuration.outputTopic, m +updateVar.value )) val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new StringDecoder())(0) for (messageAndTopic - controlStream) { println(ricevo) updateVar.unpersist() updateVar=ssc.sparkContext.broadcast(messageAndTopic.message) } ssc.start() ssc.awaitTermination() -- updateVar is correctly updated both in myTransformation and in the main scope and I can access the updated value. But when I try to do this moving the logic to a class, it fails. I have something like this (or the same queue listener from before, but moved to another class): class Listener(var updateVar: Broadcast[String]){... def someFunc()={ updateVar.unpersist() updateVar=sc.broadcast(new value) } ... } This fails: the variable can be destroyed but cannot be updated. Any suggestion on why there is this behaviour? Also I would like to know how Spark notices the reassignment to var and start the rebroadcasting. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: How to read gzip data in Spark - Simple question
Have you tried reading the spark documentation? http://spark.apache.org/docs/latest/programming-guide.html Thank you, Ilya Ganelin -Original Message- From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com] Sent: Thursday, August 06, 2015 12:41 AM Eastern Standard Time To: Philip Weaver Cc: user Subject: Re: How to read gzip data in Spark - Simple question how do i persist the RDD to HDFS ? On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: This message means that java.util.Date is not supported by Spark DataFrame. You'll need to use java.sql.Date, I believe. On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com wrote: That seem to be working. however i see a new exception Code: def formatStringAsDate(dateStr: String) = new SimpleDateFormat(-MM-dd).parse(dateStr) //(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,) val rowStructText = sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz) case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: Integer, f13: Integer, f14: String) val summary = rowStructText.map(s = s.split(,)).map( s = Summary(formatStringAsDate(s(0)), s(1).replaceAll(\, ).toLong, s(3).replaceAll(\, ).toLong, s(4).replaceAll(\, ).toInt, s(5).replaceAll(\, ), s(6).replaceAll(\, ).toInt, formatStringAsDate(s(7)), formatStringAsDate(s(8)), s(9).replaceAll(\, ).toInt, s(10).replaceAll(\, ).toInt, s(11).replaceAll(\, ).toFloat, s(12).replaceAll(\, ).toInt, s(13).replaceAll(\, ).toInt, s(14).replaceAll(\, ) ) ).toDF() bank.registerTempTable(summary) //Output import java.text.SimpleDateFormat import java.util.Calendar import java.util.Date formatStringAsDate: (dateStr: String)java.util.Date rowStructText: org.apache.spark.rdd.RDD[String] = /user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 java.lang.UnsupportedOperationException: Schema for type java.util.Date is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote: The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.commailto:deepuj...@gmail.com wrote: I have csv data that is embedded in gzip format on HDFS. With Pig a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' using PigStorage(); b = limit a 10 (2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,) (2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,) However with Spark val rowStructText = sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz) val x = rowStructText.map(s = { println(s) s} ) x.count Questions 1) x.count always shows 67 irrespective of the path i change in sc.parallelize 2) It shows x as RDD[Char] instead of String 3) println() never emits the rows. Any suggestions -Deepak -- Deepak -- Deepak -- Deepak The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Real-time data visualization with Zeppelin
Hi all – I’m just wondering if anyone has had success integrating Spark Streaming with Zeppelin and actually dynamically updating the data in near real-time. From my investigation, it seems that Zeppelin will only allow you to display a snapshot of data, not a continuously updating table. Has anyone figured out if there’s a way to loop a display command or how to provide a mechanism to continuously update visualizations? Thank you, Ilya Ganelin [cid:0042A8D7-6242-41E8-80ED-0D0CC16C96B5] The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Making Unpersist Lazy
You may pass an optional parameter (blocking = false) to make it lazy. Thank you, Ilya Ganelin -Original Message- From: Jem Tucker [jem.tuc...@gmail.commailto:jem.tuc...@gmail.com] Sent: Thursday, July 02, 2015 04:06 AM Eastern Standard Time To: Akhil Das Cc: user Subject: Re: Making Unpersist Lazy Hi, After running some tests it appears the unpersist is called as soon as it is reached, so any tasks using this rdd later on will have to re calculate it. This is fine for simple programs but when an rdd is created within a function and its reference is then lost but children of it continue to be used the persist/unpersist does not work effectively Thanks Jem On Thu, 2 Jul 2015 at 08:18, Akhil Das ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote: rdd's which are no longer required will be removed from memory by spark itself (which you can consider as lazy?). Thanks Best Regards On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker jem.tuc...@gmail.commailto:jem.tuc...@gmail.com wrote: Hi, The current behavior of rdd.unpersist() appears to not be lazily executed and therefore must be placed after an action. Is there any way to emulate lazy execution of this function so it is added to the task queue? Thanks, Jem The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Performing sc.paralleize (..) in workers not in the driver program
The parallelize operation accepts as input a data structure in memory. When you call it, you are necessarily operating In the memory space of the driver since that is where user code executes. Until you have an RDD, you can't really operate in a distributed way. If your files are stores in a distributed file system such as HDFS then you can create an RDD from those directly with sc.textFile(...). Thank you, Ilya Ganelin -Original Message- From: shahab [shahab.mok...@gmail.commailto:shahab.mok...@gmail.com] Sent: Thursday, June 25, 2015 12:46 PM Eastern Standard Time To: user@spark.apache.org Subject: Performing sc.paralleize (..) in workers not in the driver program Hi, Apparently, sc.paralleize (..) operation is performed in the driver program not in the workers ! Is it possible to do this in worker process for the sake of scalability? best /Shahab The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Matrix Multiplication and mllib.recommendation
Actually talk about this exact thing in a blog post here http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/. Keep in mind, you're actually doing a ton of math. Even with proper caching and use of broadcast variables this will take a while defending on the size of your cluster. To get real results you may want to look into locality sensitive hashing to limit your search space and definitely look into spinning up multiple threads to process your product features in parallel to increase resource utilization on the cluster. Thank you, Ilya Ganelin -Original Message- From: afarahat [ayman.fara...@yahoo.commailto:ayman.fara...@yahoo.com] Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time To: user@spark.apache.org Subject: Matrix Multiplication and mllib.recommendation Hello; I am trying to get predictions after running the ALS model. The model works fine. In the prediction/recommendation , I have about 30 ,000 products and 90 Millions users. When i try the predict all it fails. I have been trying to formulate the problem as a Matrix multiplication where I first get the product features, broadcast them and then do a dot product. Its still very slow. Any reason why here is a sample code def doMultiply(x): a = [] #multiply by mylen = len(pf.value) for i in range(mylen) : myprod = numpy.dot(x,pf.value[i][1]) a.append(myprod) return a myModel = MatrixFactorizationModel.load(sc, FlurryModelPath) #I need to select which products to broadcast but lets try all m1 = myModel.productFeatures().sample(False, 0.001) pf = sc.broadcast(m1.collect()) uf = myModel.userFeatures() f1 = uf.map(lambda x : (x[0], doMultiply(x[1]))) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Does long-lived SparkContext hold on to executor resources?
Also check out the spark.cleaner.ttl property. Otherwise, you will accumulate shuffle metadata in the memory of the driver. Sent with Good (www.good.com) -Original Message- From: Silvio Fiorito [silvio.fior...@granturing.commailto:silvio.fior...@granturing.com] Sent: Monday, May 11, 2015 01:03 PM Eastern Standard Time To: stanley; user@spark.apache.org Subject: Re: Does long-lived SparkContext hold on to executor resources? You want to look at dynamic resource allocation, here: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation On 5/11/15, 11:23 AM, stanley wangshua...@yahoo.com wrote: I am building an analytics app with Spark. I plan to use long-lived SparkContexts to minimize the overhead for creating Spark contexts, which in turn reduces the analytics query response time. The number of queries that are run in the system is relatively small each day. Would long lived contexts hold on to the executor resources when there is no queries running? Is there a way to free executor resources in this type of use cases? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-long-lived-SparkContext-hold-on-to-executor-resources-tp22848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: ReduceByKey and sorting within partitions
Marco - why do you want data sorted both within and across partitions? If you need to take an ordered sequence across all your data you need to either aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an ordered index to your data that matches the order it was stored on HDFS. You can then get the data in order by filtering based on that index. Let me know if that's not what you need - thanks! Sent with Good (www.good.com) -Original Message- From: Marco [marcope...@gmail.commailto:marcope...@gmail.com] Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time To: user@spark.apache.org Subject: ReduceByKey and sorting within partitions Hi, I'm trying, after reducing by key, to get data ordered among partitions (like RangePartitioner) and within partitions (like sortByKey or repartitionAndSortWithinPartition) pushing the sorting down to the shuffles machinery of the reducing phase. I think, but maybe I'm wrong, that the correct way to do that is that combineByKey call setKeyOrdering function on the ShuflleRDD that it returns. Am I wrong? Can be done by a combination of other transformations with the same efficiency? Thanks, Marco - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?
What command are you using to untar? Are you running out of disk space? Sent with Good (www.good.com) -Original Message- From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com] Sent: Monday, April 27, 2015 11:44 AM Eastern Standard Time To: user Subject: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ? I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and direct link. Each time i untar i get below error spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty error message) tar: Error exit delayed from previous errors Is it broken ? -- Deepak The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?
If you're reading a file one by line then you should simply use Java's Hadoop FileSystem class to read the file with a BuffereInputStream. I don't think you need an RDD here. Sent with Good (www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 11:04 AM Eastern Standard Time To: Ganelin, Ilya Cc: Spico Florin; user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? The problem I'm facing is that I need to process lines from input file in the order they're stored in the file, as they define the order of updates I need to apply on some data and these updates are not commutative so that order matters. Unfortunately the input is purely order-based, theres no timestamp per line etc. in the file and I'd prefer to avoid preparing the file in advance by adding ordinals before / after each line. From the approaches you suggested first two won't work as there's nothing I could sort by. I'm not sure about the third one - I'm just not sure what you meant there to be honest :-) Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 15:48, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote: Michael - you need to sort your RDD. Check out the shuffle documentation on the Spark Programming Guide. It talks about this specifically. You can resolve this in a couple of ways - either by collecting your RDD and sorting it, using sortBy, or not worrying about the internal ordering. You can still extract elements in order by using a filter with the zip if e.g RDD.filter(s = s._2 50).sortBy(_._1) Sent with Good (www.good.comhttp://www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time To: Spico Florin Cc: user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? Of course after you do it, you probably want to call repartition(somevalue) on your RDD to get your paralellism back. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 15:28, Michal Michalski michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote: I did a quick test as I was curious about it too. I created a file with numbers from 0 to 999, in order, line by line. Then I did: scala val numbers = sc.textFile(./numbers.txt) scala val zipped = numbers.zipWithUniqueId scala zipped.foreach(i = println(i)) Expected result if the order was preserved would be something like: (0, 0), (1, 1) etc. Unfortunately, the output looks like this: (126,1) (223,2) (320,3) (1,0) (127,11) (2,10) (...) The workaround I found that works for me for my specific use case (relatively small input files) is setting explicitly the number of partitions to 1 when reading a single *text* file: scala val numbers_sp = sc.textFile(./numbers.txt, 1) Than the output is exactly as I would expect. I didn't dive into the code too much, but I took a very quick look at it and figured out - correct me if I missed something, it's Friday afternoon! ;-) - that this workaround will work fine for all the input formats inheriting from org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - see the implementation of getSplits() method there ( http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 ). The numSplits variable passed there is exactly the same value as you provide as a second argument to textFile, which is minPartitions. However, while *min* suggests that we can only define a minimal number of partitions, while we have no control over the max, from what I can see in the code, that value specifies the *exact* number of partitions per the FileInputFormat.getSplits implementation. Of course it can differ for other input formats, but in this case it should work just fine. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 14:05, Spico Florin spicoflo...@gmail.commailto:spicoflo...@gmail.com wrote: Hello! I know that HadoopRDD partitions are built based on the number of splits in HDFS. I'm wondering if these partitions preserve the initial order of data in file. As an example, if I have an HDFS (myTextFile) file that has these splits: split 0- line 1, ..., line k split 1-line k+1,..., line k+n splt 2-line k+n, line k+n+m and the code val lines=sc.textFile(hdfs://mytextFile) lines.zipWithIndex() will the order of lines preserved? (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. I found
RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?
Michael - you need to sort your RDD. Check out the shuffle documentation on the Spark Programming Guide. It talks about this specifically. You can resolve this in a couple of ways - either by collecting your RDD and sorting it, using sortBy, or not worrying about the internal ordering. You can still extract elements in order by using a filter with the zip if e.g RDD.filter(s = s._2 50).sortBy(_._1) Sent with Good (www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time To: Spico Florin Cc: user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? Of course after you do it, you probably want to call repartition(somevalue) on your RDD to get your paralellism back. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 15:28, Michal Michalski michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote: I did a quick test as I was curious about it too. I created a file with numbers from 0 to 999, in order, line by line. Then I did: scala val numbers = sc.textFile(./numbers.txt) scala val zipped = numbers.zipWithUniqueId scala zipped.foreach(i = println(i)) Expected result if the order was preserved would be something like: (0, 0), (1, 1) etc. Unfortunately, the output looks like this: (126,1) (223,2) (320,3) (1,0) (127,11) (2,10) (...) The workaround I found that works for me for my specific use case (relatively small input files) is setting explicitly the number of partitions to 1 when reading a single *text* file: scala val numbers_sp = sc.textFile(./numbers.txt, 1) Than the output is exactly as I would expect. I didn't dive into the code too much, but I took a very quick look at it and figured out - correct me if I missed something, it's Friday afternoon! ;-) - that this workaround will work fine for all the input formats inheriting from org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - see the implementation of getSplits() method there ( http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29 ). The numSplits variable passed there is exactly the same value as you provide as a second argument to textFile, which is minPartitions. However, while *min* suggests that we can only define a minimal number of partitions, while we have no control over the max, from what I can see in the code, that value specifies the *exact* number of partitions per the FileInputFormat.getSplits implementation. Of course it can differ for other input formats, but in this case it should work just fine. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 14:05, Spico Florin spicoflo...@gmail.commailto:spicoflo...@gmail.com wrote: Hello! I know that HadoopRDD partitions are built based on the number of splits in HDFS. I'm wondering if these partitions preserve the initial order of data in file. As an example, if I have an HDFS (myTextFile) file that has these splits: split 0- line 1, ..., line k split 1-line k+1,..., line k+n splt 2-line k+n, line k+n+m and the code val lines=sc.textFile(hdfs://mytextFile) lines.zipWithIndex() will the order of lines preserved? (line 1, zipIndex 1) , .. (line k, zipIndex k), and so one. I found this question on stackoverflow (http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd) whose answer intrigued me: Essentially, RDD's zipWithIndex() method seems to do this, but it won't preserve the original ordering of the data the RDD was created from Can you please confirm that is this the correct answer? Thanks. Florin The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?
To maintain the order you can use zipWithIndex as Sean Owen pointed out. This is the same as zipWithUniqueId except the assigned number is the index of the data in the RDD which I believe matches the order of data as it's stored on HDFS. Sent with Good (www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 11:18 AM Eastern Standard Time To: Ganelin, Ilya Cc: Spico Florin; user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? I read it one by one as I need to maintain the order, but it doesn't mean that I process them one by one later. Input lines refer to different entities I update, so once I read them in order, I group them by the id of the entity I want to update, sort the updates on per-entity basis and process them further in parallel (including writing data to C* and Kafka at the very end). That's what I use Spark for - the first step I ask about is just a requirement related to the input format I get and need to support. Everything what happens after that is just a normal data processing job that you want to distribute. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 16:10, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote: If you're reading a file one by line then you should simply use Java's Hadoop FileSystem class to read the file with a BuffereInputStream. I don't think you need an RDD here. Sent with Good (www.good.comhttp://www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 11:04 AM Eastern Standard Time To: Ganelin, Ilya Cc: Spico Florin; user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? The problem I'm facing is that I need to process lines from input file in the order they're stored in the file, as they define the order of updates I need to apply on some data and these updates are not commutative so that order matters. Unfortunately the input is purely order-based, theres no timestamp per line etc. in the file and I'd prefer to avoid preparing the file in advance by adding ordinals before / after each line. From the approaches you suggested first two won't work as there's nothing I could sort by. I'm not sure about the third one - I'm just not sure what you meant there to be honest :-) Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 15:48, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote: Michael - you need to sort your RDD. Check out the shuffle documentation on the Spark Programming Guide. It talks about this specifically. You can resolve this in a couple of ways - either by collecting your RDD and sorting it, using sortBy, or not worrying about the internal ordering. You can still extract elements in order by using a filter with the zip if e.g RDD.filter(s = s._2 50).sortBy(_._1) Sent with Good (www.good.comhttp://www.good.com) -Original Message- From: Michal Michalski [michal.michal...@boxever.commailto:michal.michal...@boxever.com] Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time To: Spico Florin Cc: user Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop? Of course after you do it, you probably want to call repartition(somevalue) on your RDD to get your paralellism back. Kind regards, Michał Michalski, michal.michal...@boxever.commailto:michal.michal...@boxever.com On 24 April 2015 at 15:28, Michal Michalski michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote: I did a quick test as I was curious about it too. I created a file with numbers from 0 to 999, in order, line by line. Then I did: scala val numbers = sc.textFile(./numbers.txt) scala val zipped = numbers.zipWithUniqueId scala zipped.foreach(i = println(i)) Expected result if the order was preserved would be something like: (0, 0), (1, 1) etc. Unfortunately, the output looks like this: (126,1) (223,2) (320,3) (1,0) (127,11) (2,10) (...) The workaround I found that works for me for my specific use case (relatively small input files) is setting explicitly the number of partitions to 1 when reading a single *text* file: scala val numbers_sp = sc.textFile(./numbers.txt, 1) Than the output is exactly as I would expect. I didn't dive into the code too much, but I took a very quick look at it and figured out - correct me if I missed something, it's Friday afternoon! ;-) - that this workaround will work fine for all the input formats inheriting from org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - see the implementation of getSplits() method
RE: Map Question
You need to expose that variable the same way you'd expose any other variable in Python that you wanted to see across modules. As long as you share a spark context all will work as expected. http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable Sent with Good (www.good.com) -Original Message- From: Vadim Bichutskiy [vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com] Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time To: Tathagata Das Cc: user@spark.apache.org Subject: Re: Map Question Here it is. How do I access a broadcastVar in a function that's in another module (process_stuff.py below): Thanks, Vadim main.py --- from pyspark import SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.sql import SQLContext from process_stuff import myfunc from metadata import get_metadata conf = SparkConf().setAppName('My App').setMaster('local[4]') sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 30) sqlContext = SQLContext(sc) distFile = ssc.textFileStream(s3n://...) distFile.foreachRDD(process) mylist = get_metadata() print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' ## mylist and broadcastVar, broadcastVar.value print fine def getSqlContextInstance(sparkContext): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext) return globals()['sqlContextSingletonInstance'] def process(rdd): sqlContext = getSqlContextInstance(rdd.context) if rdd.take(1): jsondf = sqlContext.jsonRDD(rdd) #jsondf.printSchema() jsondf.registerTempTable('mytable') stuff = sqlContext.sql(SELECT ...) stuff_mapped = stuff.map(myfunc) ## I want myfunc to see mylist from above? ... process_stuff.py -- def myfunc(x): metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX? ... metadata.py def get_metadata(): ... return mylist [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Can you give full code? especially the myfunc? On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Here's what I did: print 'BROADCASTING...' broadcastVar = sc.broadcast(mylist) print broadcastVar print broadcastVar.value print 'FINISHED BROADCASTING...' The above works fine, but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' is not defined The myfunc function is in a different module. How do I make it aware of broadcastVar? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Great. Will try to modify the code. Always room to optimize! [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Absolutely. The same code would work for local as well as distributed mode! On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Can I use broadcast vars in local mode? [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Yep. Not efficient. Pretty bad actually. That's why broadcast variable were introduced right at the very beginning of Spark. On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote: Thanks TD. I was looking into broadcast variables. Right now I am running it locally...and I plan to move it to production on EC2. The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but I don't think it's efficient? mylist is filled only once at the start and never changes. Vadim [https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Is the mylist present on every executor? If not, then you have to pass it on. And broadcasts are the best way to pass them on. But note that once broadcasted it will immutable at the executors, and if you update the list at the driver, you will have to broadcast it again. TD
RE: spark with kafka
Write Kafka stream to HDFS via Spark streaming then ingest files via Spark from HDFS. Sent with Good (www.good.com) -Original Message- From: Shushant Arora [shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Saturday, April 18, 2015 06:44 AM Eastern Standard Time To: user Subject: spark with kafka Hi I want to consume messages from kafka queue using spark batch program not spark streaming, Is there any way to achieve this, other than using low level(simple api) of kafka consumer. Thanks The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: mapPartitions - How Does it Works
Map partitions works as follows : 1) For each partition of your RDD, it provides an iterator over the values within that partition 2) You then define a function that operates on that iterator Thus if you do the following: val parallel = sc.parallelize(1 to 10, 3) parallel.mapPartitions( x = x.map(s = s + 1)).collect You would get: res3: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11) In your example, x is not a pointer that traverses the iterator (e.g. With .next()) , it¹s literally the Iterable object itself. On 3/18/15, 10:19 AM, ashish.usoni ashish.us...@gmail.com wrote: I am trying to understand about mapPartitions but i am still not sure how it works in the below example it create three partition val parallel = sc.parallelize(1 to 10, 3) and when we do below parallel.mapPartitions( x = List(x.next).iterator).collect it prints value Array[Int] = Array(1, 4, 7) Can some one please explain why it prints 1,4,7 only Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does -it-Works-tp22123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Brodcast Variable updated from one transformation and used from another
You're not using the broadcasted variable within your map operations. You're attempting to modify myObjrct directly which won't work because you are modifying the serialized copy on the executor. You want to do myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup. Sent with Good (www.good.com) -Original Message- From: Yiannis Gkoufas [johngou...@gmail.commailto:johngou...@gmail.com] Sent: Tuesday, February 24, 2015 12:12 PM Eastern Standard Time To: user@spark.apache.org Subject: Brodcast Variable updated from one transformation and used from another Hi all, I am trying to do the following. val myObject = new MyObject(); val myObjectBroadcasted = sc.broadcast(myObject); val rdd1 = sc.textFile(/file1).map(e = { myObject.insert(e._1); (e._1,1) }); rdd.cache.count(); //to make sure it is transformed. val rdd2 = sc.textFile(/file2).map(e = { val lookedUp = myObject.lookup(e._1); (e._1, lookedUp) }); When I check the contents of myObject within the map of rdd1 everything seems ok. On the other hand when I check the contents of myObject within the map of rdd2 it seems to be empty. I am doing something wrong? Thanks a lot! The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Spark job fails on cluster but works fine on a single machine
When writing to hdfs Spark will not overwrite existing files or directories. You must either manually delete these or use Java's Hadoop FileSystem class to remove them. Sent with Good (www.good.com) -Original Message- From: Pavel Velikhov [pavel.velik...@gmail.commailto:pavel.velik...@gmail.com] Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time To: user@spark.apache.org Subject: Spark job fails on cluster but works fine on a single machine I have a simple Spark job that goes out to Cassandra, runs a pipe and stores results: val sc = new SparkContext(conf) val rdd = sc.cassandraTable(“keyspace, “table) .map(r = r.getInt(“column) + \t + write(get_lemmas(r.getString(tags .pipe(python3 /tmp/scripts_and_models/scripts/run.py) .map(r = convertStr(r) ) .coalesce(1,true) .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt) //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”)) When run on a single machine, everything is fine if I save to an hdfs file or save to Cassandra. When run in cluster neither works: - When saving to file, I get an exception: User class threw exception: Output directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists - When saving to Cassandra, only 4 rows are updated with empty data (I test on a 4-machine Spark cluster) Any hints on how to debug this and where the problem could be? - I delete the hdfs file before running - Would really like the output to hdfs to work, so I can debug - Then it would be nice to save to Cassandra The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: RDD Partition number
As Ted Yu points out, default block size is 128MB as of Hadoop 2.1. Sent with Good (www.good.com) -Original Message- From: Ilya Ganelin [ilgan...@gmail.commailto:ilgan...@gmail.com] Sent: Thursday, February 19, 2015 12:13 PM Eastern Standard Time To: Alessandro Lulli; user@spark.apache.org Cc: Massimiliano Bertolucci Subject: Re: RDD Partition number By default you will have (fileSize in Mb / 64) partitions. You can also set the number of partitions when you read in a file with sc.textFile as an optional second parameter. On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli lu...@di.unipi.itmailto:lu...@di.unipi.it wrote: Hi All, Could you please help me understanding how Spark defines the number of partitions of the RDDs if not specified? I found the following in the documentation for file loaded from HDFS: The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks What is the rule for file loaded from the file systems? For instance, i have a file X replicated on 4 machines. If i load the file X in a RDD how many partitions are defined and why? Thanks for your help on this Alessandro The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Is there a fast way to do fast top N product recommendations for all users
Hi all - I've spent a while playing with this. Two significant sources of speed up that I've achieved are 1) Manually multiplying the feature vectors and caching either the user or product vector 2) By doing so, if one of the RDDs is a global it becomes possible to parallelize this step by running it in a thread and submitting multiple threads to yarn engine. Doing so I've achieved an over 75x speed up compared with the packaged versio inside ml lib. Sent with Good (www.good.com) -Original Message- From: Sean Owen [so...@cloudera.commailto:so...@cloudera.com] Sent: Thursday, February 12, 2015 05:47 PM Eastern Standard Time To: Crystal Xing Cc: user@spark.apache.org Subject: Re: Is there a fast way to do fast top N product recommendations for all users Not now, but see https://issues.apache.org/jira/browse/SPARK-3066 As an aside, it's quite expensive to make recommendations for all users. IMHO this is not something to do, if you can avoid it architecturally. For example, consider precomputing recommendations only for users whose probability of needing recommendations soon is not very small. Usually, only a small number of users are active. On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I wonder if there is a way to do fast top N product recommendations for all users in training using mllib's ALS algorithm. I am currently calling public Rating[] recommendProducts(int user, int num) method in MatrixFactorizatoinModel for users one by one and it is quite slow since it does not operate on RDD input? I also tried to generate all possible user-product pairs and use public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts) to fill out the matrix. Since I have a large number of user and products, the job stucks and transforming all pairs. I wonder if there is a better way to do this. Thanks, Crystal. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: spark challenge: zip with next???
Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Sent with Good (www.good.com) -Original Message- From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com] Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time To: user@spark.apache.org Subject: spark challenge: zip with next??? Here is a spark challenge for you! I have a data set where each entry has a date. I would like to identify gaps in the dates greater larger a given length. For example, if the data were log entries, then the gaps would tell me when I was missing log data for long periods of time. What is the most efficient way to achieve this in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: quickly counting the number of rows in a partition?
Alternative to doing a naive toArray is to declare an accumulator per partition and use that. It's specifically what they were designed to do. See the programming guide. Sent with Good (www.good.com) -Original Message- From: Tobias Pfeiffer [t...@preferred.jpmailto:t...@preferred.jp] Sent: Tuesday, January 13, 2015 08:06 PM Eastern Standard Time To: Kevin Burton Cc: Ganelin, Ilya; user@spark.apache.org Subject: Re: quickly counting the number of rows in a partition? Hi, On Mon, Jan 12, 2015 at 8:09 PM, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote: Use the mapPartitions function. It returns an iterator to each partition. Then just get that length by converting to an array. On Tue, Jan 13, 2015 at 2:50 PM, Kevin Burton bur...@spinn3r.commailto:bur...@spinn3r.com wrote: Doesn’t that just read in all the values? The count isn’t pre-computed? It’s not the end of the world if it’s not but would be faster. Well, converting to an array may not work due to memory constraints, counting the items in the iterator may be better. However, there is no pre-computed value. For counting, you need to compute all values in the RDD, in general. If you think of items.map(x = /* throw exception */).count() then even though the count you want to get does not necessarily require the evaluation of the function in map() (i.e., the number is the same), you may not want to get the count if that code actually fails. Tobias The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0
There are two related options: To solve your problem directly try: val conf = new SparkConf().set(spark.yarn.driver.memoryOverhead, 1024) val sc = new SparkContext(conf) And the second, which increases the overall memory available on the driver, as part of your spark-submit script add: --driver-memory 2g Hope this helps! From: David McWhorter mcwhor...@ccri.commailto:mcwhor...@ccri.com Date: Monday, January 12, 2015 at 11:01 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0 Hi all, I'm trying to figure out how to set this option: spark.yarn.driver.memoryOverhead on Spark 1.2.0. I found this helpful overview http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476, which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to spark-submit. However, when I do that I get this error: Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'. Run with --help for usage help or --verbose for debug output I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 1024) on my spark configuration object but I still get Will allocate AM container, with MB memory including 384 MB overhead when launching. I'm running in yarn-cluster mode. Any help or tips would be appreciated. Thanks, David -- David McWhorter Software Engineer Commonwealth Computer Research, Inc. 1422 Sachem Place, Unit #1 Charlottesville, VA 22901 mcwhor...@ccri.commailto:mcwhor...@ccri.com | 434.299.0090x204 The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: MatrixFactorizationModel serialization
Try loading features as Val userfeatures = sc.objectFile(path1) Val productFeatures = sc.objectFile(path2) And then call the constructor of the MatrixFsgtorizationModel with those. Sent with Good (www.good.com) -Original Message- From: wanbo [gewa...@163.commailto:gewa...@163.com] Sent: Wednesday, January 07, 2015 10:54 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: MatrixFactorizationModel serialization I save and reload model like this: val bestModel = ALS.train(training, rank, numIter, lambda) bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures) val bestModel = obj.asInstanceOf[MatrixFactorizationModel] bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures) bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures) But, there has same exception: Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: java.lang.NullPointerException at com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138) at com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala) ... 5 more Have fixed this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
HDFS_DELEGATION_TOKEN errors after switching Spark Contexts
Hi all. In order to get Spark to properly release memory during batch processing as a workaround to issue https://issues.apache.org/jira/browse/SPARK-4927 I tear down and re-initialize the spark context with : context.stop() and context = new SparkContext() The problem I run into is that eventually I hit the below error: :15/01/06 13:52:34 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0 [1:53pm]:15/01/06 13:52:34 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache [1:53pm]:Exception in thread main org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache This terminates execution but I have no idea why this would be happening. Does anyone know what could be at play here? This error appears as soon as I try to hit HDFS after re-starting a Spark context. When this issue appears is not deterministic and I am able to run several successful iterations before I see it. Any help would be much appreciated. Thank you. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Long-running job cleanup
The previously submitted code doesn’t actually show the problem I was trying to show effectively since the issue becomes clear between subsequent steps. Within a single step it appears things are cleared up properly. Memory usage becomes evident pretty quickly. def showMemoryUsage(sc: SparkContext) = { val usersPerStep = 2500 val count = 100 val numSteps = count / usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s = (s, 2)).partitionBy(new HashPartitioner(200)).cache() val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 100).map(s = (s, 4)).repartition(1).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() val results = usersFiltered.map(user = { val userScore = userFeatures.lookup(user).head val recPerUser = Array(1,2,userScore) recPerUser }) val mapedResults: Array[Int] = results.flatMap(scores = scores).toArray log(State: Computed + mapedResults.length + predictions for stage + i) sc.parallelize(mapedResults) // Write to disk (left out since problem is evident even without it) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” within a single step memory is cleared properly: Free 441.1 MB Free 439.8 MB Free 439.8 MB Free 441.1 MB Free 441.1 MB Free 439.8 MB But between steps, the amount of available memory decreases (e.g. That range that things oscillate between shrinks) and over the course of many hours this eventually reduces to zero. Free 440.7 MB Free 438.7 MB Free 438.7 MB Free 440.7 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Free 425.0 MB Free 425.0 MB Free 435.4 MB Free 426.7 MB Free 402.5 MB Free 402.5 MB Free 426.7 MB Free 426.7 MB Free 402.5 MB Free 402.5 MB Free 426.7 MB From: Ganelin, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Date: Tuesday, December 30, 2014 at 7:30 PM To: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com, Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Long-running job cleanup Hi Patrick, to follow up on the below discussion, I am including a short code snippet that produces the problem on 1.1. This is kind of stupid code since it’s a greatly simplified version of what I’m actually doing but it has a number of the key components in place. I’m also including some example log output. Thank you. def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com Date: Sunday, December 28, 2014 at 4:02 PM To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Long-running job cleanup Hi Patrick - is that cleanup present in 1.1? The overhead I am talking about is with regards to what I believe is shuffle related metadata. If I watch the execution log I see small broadcast variables created for every stage of execution, a few KB at a time
Re: Long-running job cleanup
Hi Patrick, to follow up on the below discussion, I am including a short code snippet that produces the problem on 1.1. This is kind of stupid code since it’s a greatly simplified version of what I’m actually doing but it has a number of the key components in place. I’m also including some example log output. Thank you. def showMemoryUsage(sc : SparkContext) = { val usersPerStep = 25000 val count = 100 val numSteps = count/usersPerStep val users = sc.parallelize(1 to count) val zippedUsers = users.zipWithIndex().cache() val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to count).map(s=(s,2)).cache() val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000) .map(s = (s, 4)).cache() for (i - 1 to numSteps) { val usersFiltered = zippedUsers.filter(s = { ((i - 1) * usersPerStep = s._2) (s._2 i * usersPerStep) }).map(_._1).collect() usersFiltered.foreach(user = { val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head) mult.takeOrdered(20) // Normally this would then be written to disk // For the sake of the example this is all we're doing }) } } Example broadcast variable added: 14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 MB) And then if I parse the entire log looking for “free : XXX.X MB” I see the available memory slowly ticking away: Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB Free 441.8 MB … Free 441.7 MB Free 441.7 MB Free 441.7 MB Free 441.7 MB And so on. Clearly the above code is not persisting the intermediate RDD (mult), yet memory is never being properly freed up. From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com Date: Sunday, December 28, 2014 at 4:02 PM To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Long-running job cleanup Hi Patrick - is that cleanup present in 1.1? The overhead I am talking about is with regards to what I believe is shuffle related metadata. If I watch the execution log I see small broadcast variables created for every stage of execution, a few KB at a time, and a certain number of MB remaining of available memory on the driver. As I run, this available memory goes down, and these variables are never erased. The only RDDs that persist are those that are explicitly cached. The RDDs that are generated iteratively are not retained or referenced, so I would expect things to get cleaned up but they do not. The items consuming memory are not RDDs but what appears to be shuffle metadata. I have a script that parses the logs to show memory consumption over time and the script shows memory very steadily being consumed over many hours without clearing one small bit at a time. The specific computation I am doing is the generation of dot products between two RDDs of vectors. I need to generate this product for every combination of products between the two RDDs but both RDDs are too big to fit in memory. Consequently, I iteratively generate this product across one entry from the first RDD and all entries from the second and retain the pared-down result within an accumulator (by retaining the top N results it is possible to actually store the Cartesian which is otherwise too large to fit on disk). After a certain number of iterations these intermediate results are then written to disk. Each of these steps is tractable in itself but due to the accumulation of memory, the overall job becomes intractable. I would appreciate any suggestions as to how to clean up these intermediate broadcast variables. Thank you. On Sun, Dec 28, 2014 at 1:56 PM Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com wrote: What do you mean when you say the overhead of spark shuffles start to accumulate? Could you elaborate more? In newer versions of Spark shuffle data is cleaned up automatically when an RDD goes out of scope. It is safe to remove shuffle data at this point because the RDD can no longer be referenced. If you are seeing a large build up of shuffle data, it's possible you are retaining references to older RDDs inadvertently. Could you explain what your job actually doing? - Patrick On Mon, Dec 22, 2014 at 2:36 PM, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote: Hi all, I have a long running job iterating over a huge dataset. Parts of this operation are cached. Since the job runs for so long, eventually the overhead of spark shuffles starts to accumulate culminating in the driver starting to swap. I am aware of the spark.cleanup.tll parameter that allows me to configure when cleanup happens but the issue
Long-running job cleanup
Hi all, I have a long running job iterating over a huge dataset. Parts of this operation are cached. Since the job runs for so long, eventually the overhead of spark shuffles starts to accumulate culminating in the driver starting to swap. I am aware of the spark.cleanup.tll parameter that allows me to configure when cleanup happens but the issue with doing this is that it isn’t done safely, e.g. I can be in the middle of processing a stage when this cleanup happens and my cached RDDs get cleared. This ultimately causes a KeyNotFoundException when I try to reference the now cleared cached RDD. This behavior doesn’t make much sense to me, I would expect the cached RDD to either get regenerated or at the very least for there to be an option to execute this cleanup without deleting those RDDs. Is there a programmatically safe way of doing this cleanup that doesn’t break everything? If I instead tear down the spark context and bring up a new context for every iteration (assuming that each iteration is sufficiently long-lived), would memory get released appropriately? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Understanding disk usage with Accumulators
Hi all – I’m running a long running batch-processing job with Spark through Yarn. I am doing the following Batch Process val resultsArr = sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]()) InMemoryArray.forEach{ 1) Using a thread pool, generate callable jobs that operate on an RDD 1a) These callable jobs perform an operation combining that RDD and a broadcasted array and store the result of that computation as an Array (Result) 2) Store the results of this operation (upon resolution) in the accumulableCollection } sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a total of about 4 times during execution over the course of several hours. My immediate problem is that during this execution two things happen. Firstly, on my driver node I eventually run out of memory, and start swapping to disk (which causes slowdown). However, each Batch can be processed entirely within the available memory on the driver, so basically this memory is somehow not being released between runs (even though I leave the context of the function running the Batch process) Secondly, during execution, things are being written to HDFS and I am running out of space on the local partitions on the node. Note, this is NOT the explicit saveAsObjectFile call that I am making, but appears to be something going on with Spark internally. Can anyone speak to what is going on under the hood here and what I can do to resolve this? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Understanding disk usage with Accumulators
Also, this may be related to this issue https://issues.apache.org/jira/browse/SPARK-3885. Further, to clarify, data is being written to Hadoop on the data nodes. Would really appreciate any help. Thanks! From: Ganelin, Ganelin, Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com Date: Tuesday, December 16, 2014 at 10:23 AM To: 'user@spark.apache.orgmailto:'user@spark.apache.org' user@spark.apache.orgmailto:user@spark.apache.org Subject: Understanding disk usage with Accumulators Hi all – I’m running a long running batch-processing job with Spark through Yarn. I am doing the following Batch Process val resultsArr = sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]()) InMemoryArray.forEach{ 1) Using a thread pool, generate callable jobs that operate on an RDD 1a) These callable jobs perform an operation combining that RDD and a broadcasted array and store the result of that computation as an Array (Result) 2) Store the results of this operation (upon resolution) in the accumulableCollection } sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a total of about 4 times during execution over the course of several hours. My immediate problem is that during this execution two things happen. Firstly, on my driver node I eventually run out of memory, and start swapping to disk (which causes slowdown). However, each Batch can be processed entirely within the available memory on the driver, so basically this memory is somehow not being released between runs (even though I leave the context of the function running the Batch process) Secondly, during execution, things are being written to HDFS and I am running out of space on the local partitions on the node. Note, this is NOT the explicit saveAsObjectFile call that I am making, but appears to be something going on with Spark internally. Can anyone speak to what is going on under the hood here and what I can do to resolve this? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: MLLib in Production
Hi all – I’ve been storing the model userFeatures and productFeatures vectors that are generated internally serialized on disk and importing them as a separate job. From: Sonal Goyal sonalgoy...@gmail.commailto:sonalgoy...@gmail.com Date: Wednesday, December 10, 2014 at 5:31 AM To: Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com Cc: Simon Chan simonc...@gmail.commailto:simonc...@gmail.com, Klausen Schaefersinho klaus.schaef...@gmail.commailto:klaus.schaef...@gmail.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: MLLib in Production You can also serialize the model and use it in other places. Best Regards, Sonal Founder, Nube Technologieshttp://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Wed, Dec 10, 2014 at 5:32 PM, Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com wrote: Hi Klaus, There is no ideal method but some workaround. Train model in Spark cluster or YARN cluster, then use RDD.saveAsTextFile to store this model which include weights and intercept to HDFS. Load weights file and intercept file from HDFS, construct a GLM model, and then run model.predict() method, you can get what you want. The Spark community also have some ongoing work about export model with PMML. 2014-12-10 18:32 GMT+08:00 Simon Chan simonc...@gmail.commailto:simonc...@gmail.com: Hi Klaus, PredictionIO is an open source product based on Spark MLlib for exactly this purpose. This is the tutorial for classification in particular: http://docs.prediction.io/classification/quickstart/ You can add custom serving logics and retrieve prediction result through REST API/SDKs at other places. Simon On Wed, Dec 10, 2014 at 2:25 AM, Klausen Schaefersinho klaus.schaef...@gmail.commailto:klaus.schaef...@gmail.com wrote: Hi, I would like to use Spark to train a model, but use the model in some other place,, e.g. a servelt to do some classification in real time. What is the best way to do this? Can I just copy I model file or something and load it in the servelt? Can anybody point me to a good tutorial? Cheers, Klaus -- “Overfitting” is not about an excessive amount of physical exercise... The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Spark executor lost
You want to look further up the stack (there are almost certainly other errors before this happens) and those other errors may give your better idea of what is going on. Also if you are running on yarn you can run yarn logs -applicationId yourAppId to get the logs from the data nodes. Sent with Good (www.good.com) -Original Message- From: S. Zhou [myx...@yahoo.com.INVALIDmailto:myx...@yahoo.com.INVALID] Sent: Wednesday, December 03, 2014 06:30 PM Eastern Standard Time To: user@spark.apache.org Subject: Spark executor lost We are using Spark job server to submit spark jobs (our spark version is 0.91). After running the spark job server for a while, we often see the following errors (executor lost) in the spark job server log. As a consequence, the spark driver (allocated inside spark job server) gradually loses executors. And finally the spark job server no longer be able to submit jobs. We tried to google the solutions but so far no luck. Please help if you have any ideas. Thanks! [2014-11-25 01:37:36,250] INFO parkDeploySchedulerBackend [] [akka://JobServer/user/context-supervisor/next-staging] - Executor 6 disconnected, so removing it [2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] [akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on : remote Akka client disassociated [2014-11-25 01:37:36,252] INFO ark.scheduler.DAGScheduler [] [] - Executor lost: 6 (epoch 8) [2014-11-25 01:37:36,252] INFO ge.BlockManagerMasterActor [] [] - Trying to remove executor 6 from BlockManagerMaster. [2014-11-25 01:37:36,252] INFO storage.BlockManagerMaster [] [] - Removed 6 successfully in removeExecutor [2014-11-25 01:37:36,286] INFO ient.AppClient$ClientActor [] [akka://JobServer/user/context-supervisor/next-staging] - Executor updated: app-20141125002023-0037/6 is now FAILED (Command exited with code 143) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
SaveAsTextFile brings down data nodes with IO Exceptions
Hi all, as the last stage of execution, I am writing out a dataset to disk. Before I do this, I force the DAG to resolve so this is the only job left in the pipeline. The dataset in question is not especially large (a few gigabytes). During this step however, HDFS will inevitable crash. I will lose connection to data-nodes and get stuck in the loop of death – failure causes job restart, eventually causing the overall job to fail. On the data node logs I see the errors below. Does anyone have any ideas as to what is going on here? Thanks! java.io.IOException: Premature EOF from inputStream at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455) at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing WRITE_BLOCK operation src: /10.37.248.60:44676 dst: /10.37.248.59:1004 java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) at java.io.FilterInputStream.read(FilterInputStream.java:83) at java.io.FilterInputStream.read(FilterInputStream.java:83) at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126) at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72) at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225) at java.lang.Thread.run(Thread.java:745) DataNode{data=FSDataset{dirpath='[/opt/cloudera/hadoop/1/dfs/dn/current, /opt/cloudera/hadoop/10/dfs/dn/current, /opt/cloudera/hadoop/2/dfs/dn/current, /opt/cloudera/hadoop/3/dfs/dn/current, /opt/cloudera/hadoop/4/dfs/dn/current, /opt/cloudera/hadoop/5/dfs/dn/current, /opt/cloudera/hadoop/6/dfs/dn/current, /opt/cloudera/hadoop/7/dfs/dn/current, /opt/cloudera/hadoop/8/dfs/dn/current, /opt/cloudera/hadoop/9/dfs/dn/current]'}, localName='innovationdatanode03.cof.ds.capitalone.com:1004', datanodeUuid='e8a11fe2-300f-4e78-9211-f2ee41af6b8c', xmitsInProgress=0}:Exception transfering block BP-1458718292-10.37.248.67-1398976716371:blk_1076854538_3118445 to mirror 10.37.248.63:1004: java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 remote=/10.37.248.63:1004] The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: ALS failure with size Integer.MAX_VALUE
Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.commailto:reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) As for the data, on the user side, the degree of a node in the connectivity graph is relatively small. However, on the item side, 3.8K out of the 4.5K items are connected to 10^5 users each on an average, with 100 items being connected to nearly 10^8 users. The rest of the items are connected to less than 10^5 users. With such a skew in the connectivity graph, I'm unsure if additional memory or variation in the block sizes would help (considering my limited understanding of the implementation in mllib). Any suggestion to address the problem? The test is being run on a standalone cluster of 3 hosts, each with 100G RAM 24 cores dedicated to the application. The additional configs I made specific to the shuffle and task failure reduction are as follows: spark.core.connection.ack.wait.timeout=600 spark.shuffle.consolidateFiles=true spark.shuffle.manager=SORT The job execution summary is as follows: Active Stages: Stage id 2, aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 failed), Shuffle Read : 141.6 GB Completed Stages (5) Stage IdDescriptionDuration Tasks: Succeeded/TotalInputShuffle ReadShuffle Write 6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min 1200/120029.9 GB1668.4 MB186.8 GB 5mapPartitionsWithIndex at ALS.scala:250 +details 3map at ALS.scala:231 0aggregate at ALS.scala:337 +details 1map at ALS.scala:228 +details Thanks, Bharath The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Cancelled Key Exceptions on Massive Join
Hello all. I have been running a Spark Job that eventually needs to do a large join. 24 million x 150 million A broadcast join is infeasible in this instance clearly, so I am instead attempting to do it with Hash Partitioning by defining a custom partitioner as: class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) { override def getPartition(key: Any): Int = key match { case k: Tuple2[Int, String] = super.getPartition(k._1) case _ = super.getPartition(key) } } I then partition both arrays using this partitioner. However, the job eventually fails with the following exception which if I had to guess indicated that a network connection was interrupted during the shuffle stage, causing things to get lost and ultimately resulting in a fetch failure: 14/11/14 12:56:21 INFO ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(innovationdatanode08.cof.ds.capitalone.com,37590) 14/11/14 12:56:21 INFO ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@7369b398 14/11/14 12:56:21 INFO ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@7369b398 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) In the spark UI, I still see a substantial amount of shuffling going on at this stage, I am wondering if I’m perhaps using the partitioner incorrectly? The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Spark- How can I run MapReduce only on one partition in an RDD?
Why do you only want the third partition? You can access individual partitions using the partitions() function. You can also filter your data using the filter() function to only contain the data you care about. Moreover, when you create your RDDs unless you define a custom partitioner you have no way of controlling what data is in partition #3. Therefore, there is almost no reason to want to operate on an individual partition. -Original Message- From: Tim Chou [timchou@gmail.commailto:timchou@gmail.com] Sent: Thursday, November 13, 2014 06:01 PM Eastern Standard Time To: user@spark.apache.org Subject: Spark- How can I run MapReduce only on one partition in an RDD? Hi All, I use textFile to create a RDD. However, I don't want to handle the whole data in this RDD. For example, maybe I only want to solve the data in 3rd partition of the RDD. How can I do it? Here are some possible solutions that I'm thinking: 1. Create multiple RDDs when reading the file 2. Run MapReduce functions with the specific partition for an RDD. However, I cannot find any appropriate function. Thank you and wait for your suggestions. Best, Tim The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Fwd: Why is Spark not using all cores on a single machine?
To set the number of spark cores used you must set two parameters in the actual spark-submit script. You must set num-executors (the number of nodes to have) and executor-cores (the number of cores per machinel) . Please see the Spark configuration and tuning pages for more details. -Original Message- From: ll [duy.huynh@gmail.commailto:duy.huynh@gmail.com] Sent: Saturday, November 08, 2014 12:05 AM Eastern Standard Time To: u...@spark.incubator.apache.org Subject: Re: Fwd: Why is Spark not using all cores on a single machine? hi. i did use local[8] as below, but it still ran on only 1 core. val sc = new SparkContext(new SparkConf().setMaster(local[8]).setAppName(abc)) any advice is much appreciated. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Why-is-Spark-not-using-all-cores-on-a-single-machine-tp1638p18397.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Redploying a spark streaming application
You’ve basically got it. Deployment step can be simply scp-ing the file to a known location on the server and then executing a run script on the server that actually runs the spark-submit. From: Ashic Mahtab as...@live.commailto:as...@live.com Date: Thursday, November 6, 2014 at 5:01 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Redploying a spark streaming application Hello, I'm trying to find the best way of redeploying a spark streaming application. Ideally, I was thinking of a scenario where a build server packages up a jar and a deployment step submits it to a Spark Master. On the next successful build, the next version would get deployed taking down the previous version. What would be the best way of achieving this? Thanks, Ashic. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Key-Value decomposition
Very straightforward: You want to use cartesian. If you have two RDDs - RDD_1(³A²) and RDD_2(1,2,3) RDD_1.cartesian(RDD_2) will generate the cross product between the two RDDs and you will have RDD_3((³A²,1), (³B²,2), (³C², 3)) On 11/3/14, 11:38 AM, david david...@free.fr wrote: Hi, I'm a newbie in Spark and faces the following use case : val data = Array ( A, 1;2;3) val rdd = sc.parallelize(data) // Something here to produce RDD of (Key,value) // ( A, 1) , (A, 2), (A, 3) Does anybody know how to do ? Thank's -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decompositio n-tp17966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Repartitioning by partition size, not by number of partitions.
Hi Jan. I've actually written a function recently to do precisely that using the RDD.randomSplit function. You just need to calculate how big each element of your data is, then how many of each data can fit in each RDD to populate the input to rqndomSplit. Unfortunately, in my case I wind up with GC errors on large data doing this and am still debugging :) -Original Message- From: jan.zi...@centrum.cz [jan.zi...@centrum.czmailto:jan.zi...@centrum.cz] Sent: Friday, October 31, 2014 06:27 AM Eastern Standard Time To: user@spark.apache.org Subject: Repartitioning by partition size, not by number of partitions. Hi, I have inpot data that are many of very small files containing one .json. For performance reasons (I use PySpark) I have to do repartioning, currently I do: sc.textFile(files).coalesce(100)) Problem is that I have to guess the number of partitions in a such way that it's as fast as possible and I am still on the sefe side with the RAM memory. So this is quiet difficult. For this reason I would like to ask if there is some way, how to replace coalesce(100) by something that creates N partitions of the given size? I went through the documentation, but I was not able to find some way, how to do that. thank you in advance for any help or advice. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: FileNotFoundException in appcache shuffle files
Hi Ryan - I've been fighting the exact same issue for well over a month now. I initially saw the issue in 1.02 but it persists in 1.1. Jerry - I believe you are correct that this happens during a pause on long-running jobs on a large data set. Are there any parameters that you suggest tuning to mitigate these situations? Also, you ask if there are any other exceptions - for me this error has tended to follow an earlier exception, which supports the theory that it is a symptom of an earlier problem. My understanding is as follows - during a shuffle step an executor fails and doesn't report its output - next, during the reduce step, that output can't be found where expected and rather than rerunning the failed execution, Spark goes down. We can add my email thread to your reference list : https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201410.mbox/CAM-S9zS-+-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd=h...@mail.gmail.com -Original Message- From: Shao, Saisai [saisai.s...@intel.commailto:saisai.s...@intel.com] Sent: Wednesday, October 29, 2014 01:46 AM Eastern Standard Time To: Ryan Williams Cc: user Subject: RE: FileNotFoundException in appcache shuffle files Hi Ryan, This is an issue from sort-based shuffle, not consolidated hash-based shuffle. I guess mostly this issue occurs when Spark cluster is in abnormal situation, maybe long time of GC pause or some others, you can check the system status or if there’s any other exceptions beside this one. Thanks Jerry From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of Ryan Williams Sent: Wednesday, October 29, 2014 1:31 PM To: user Subject: FileNotFoundException in appcache shuffle files My job is failing with the following error: 14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 (TID 6266, demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu): java.io.FileNotFoundException: /data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index (No such file or directory) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732) org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728) org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:56) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 task-1543 failures are a few instances of this failure on another task. Here is the entire App Master stdout dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}. Here's a summary of the RDD manipulations I've done up to the point of failure: * val A = [read a file in 1419 shards] * the file is 177GB compressed but ends up being ~5TB uncompressed / hydrated into scala objects (I think; see below for more discussion on this point). * some relevant Spark options: * spark.default.parallelism=2000 * --master yarn-client * --executor-memory 50g * --driver-memory 10g * --num-executors 100 * --executor-cores 4 * A.repartition(3000) * 3000 was chosen in an attempt to mitigate shuffle-disk-spillage that previous job attempts with 1000 or 1419 shards were mired in
GC Issues with randomSplit on large dataset
Hey all – not writing to necessarily get a fix but more to get an understanding of what’s going on internally here. I wish to take a cross-product of two very large RDDs (using cartesian), the product of which is well in excess of what can be stored on disk . Clearly that is intractable, thus my solution is to do things in batches - essentially I can take the cross product of a small piece of the first data set with the entirety of the other. To do this, I calculate how many items can fit into 1 gig of memory. Next, I use RDD.random Split() to partition the first data set. The issue is that I am trying to partition an RDD of several million items into several million partitions. This throws the following error: I would like to understand the internals of what’s going on here so that I can adjust my approach accordingly. Thanks in advance. 14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception in thread main java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.lang.String.substring(String.java:1913) at java.lang.String.subSequence(String.java:1946) at java.util.regex.Matcher.getSubSequence(Matcher.java:1245) at java.util.regex.Matcher.group(Matcher.java:490) at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675) at java.util.Formatter.parse(Formatter.java:2528) at java.util.Formatter.format(Formatter.java:2469) at java.util.Formatter.format(Formatter.java:2423) at java.lang.String.format(String.java:2790) at scala.collection.immutable.StringLike$class.format(StringLike.scala:266) at scala.collection.immutable.StringOps.format(StringOps.scala:31) at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944) at org.apache.spark.rdd.RDD.init(RDD.scala:1227) at org.apache.spark.rdd.RDD.init(RDD.scala:83) at org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378) at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD.randomSplit(RDD.scala:379) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Building Spark against Cloudera 5.2.0 - Failure
Hello all, I am attempting to manually build the master branch of Spark against Cloudera’s 5.2.0 deployment. To do this I am running: mvn -Pyarn -Dhadoop.version=2.5.0-cdh5.2.0 -DskipTests clean package The build completes successfully and then I run: mvn -Pyarn -Phadoop.version=2.5.0-cdh5.2.0 test Both on the cluster and on the local deployment, however, this sequence fails. Locally, I have better luck with the tests but there are still failures. On the cluster I get some passed tests but also numerous failures. Summary is below: [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [ 2.481 s] [INFO] Spark Project Core FAILURE [11:02 min] [INFO] Spark Project Bagel ... SKIPPED [INFO] Spark Project GraphX .. SKIPPED [INFO] Spark Project Streaming ... SKIPPED [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project Catalyst SKIPPED [INFO] Spark Project SQL . SKIPPED [INFO] Spark Project Hive SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project YARN Parent POM . SKIPPED [INFO] Spark Project YARN Stable API . SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project External Flume Sink . SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 11:06 min [INFO] Finished at: 2014-10-28T17:21:27-05:00 [INFO] Final Memory: 69M/301M [INFO] [WARNING] The requested profile hadoop.version=2.5.0-cdh5.2.0 could not be activated because it does not exist. [ERROR] Failed to execute goal org.scalatest:scalatest-maven-plugin:1.0:test (test) on project spark-core_2.10: There are test failures - [Help 1] What I would like to know is whether spark tests have been successfully run against the Cloudera deployment? Please let me know, thank you all. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Spark-submt job Killed
Hi Ami - I suspect that your code is completing because you have nothing to actually force resolution of your job. Spark executes lazily, so for example, if you have a bunch of maps in sequence but nothing else, Spark will not actually execute anything. Try adding an RDD.count() on the last RDD that you generate with spark to ensure that something is forcing execution at the end. On 10/28/14, 5:32 PM, akhandeshi ami.khande...@gmail.com wrote: I recently starting seeing this new problem where spark-submt is terminated by Killed message but no error message indicating what happened. I have enable logging on in spark configuration. has anyone seen this or know how to troubleshoot? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submt-job-Killed -tp17560.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Is it possible to call a transform + action inside an action?
You cannot have nested RDD transformations in Scala Spark. The issue is that when the outer operation is distributed to the cluster and kicks off a new job (the inner query) the inner job no longer has the context for the outer job. The way around this is to either do a join on two RDDs or to store a serializable lookup structure (not an RDD) in memory and have that sent to the nodes during execution. You can even do this efficiently by defining a broadcast variable. I apologize for not providing examples - am on my phone :) -Original Message- From: kpeng1 [kpe...@gmail.commailto:kpe...@gmail.com] Sent: Tuesday, October 28, 2014 06:34 PM Eastern Standard Time To: u...@spark.incubator.apache.org Subject: Is it possible to call a transform + action inside an action? I currently writing an application that uses spark streaming. What I am trying to do is basically read in a few files (I do this by using the spark context textFile) and then process those files inside an action that I apply to a streaming RDD. Here is the main code below: def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(EmailIngestion) val ssc = new StreamingContext(sparkConf, Seconds(1)) val sc = new SparkContext(sparkConf) val badWords = sc.textFile(/filters/badwords.txt) val urlBlacklist = sc.textFile(/filters/source_url_blacklist.txt) val domainBlacklist = sc.textFile(/filters/domain_blacklist.txt) val emailBlacklist = sc.textFile(/filters/blacklist.txt) val lines = FlumeUtils.createStream(ssc, localhost, 4545, StorageLevel.MEMORY_ONLY_SER_2) lines.foreachRDD(rdd = rdd.foreachPartition(json = Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist, emailBlacklist))) ssc.start() ssc.awaitTermination() } Here is the code for processing the files found inside the ProcessRecord method: val emailBlacklistCnt = emailBlacklist.filter(black = black.contains(email)).count It looks like this throws an exception. Is it possible to do this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
Have you checked for any global variables in your scope? Remember that even if variables are not passed to the function they will be included as part of the context passed to the nodes. If you can't zen out what is breaking then try to simplify what you're doing. Set up a simple test call (like a map) with the same objects you're trying to serialize and see if those work. -Original Message- From: Steve Lewis [lordjoe2...@gmail.commailto:lordjoe2...@gmail.com] Sent: Tuesday, October 28, 2014 10:46 PM Eastern Standard Time To: user@spark.apache.org Subject: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 A cluster I am running on keeps getting KryoException. Unlike the Java serializer the Kryo Exception gives no clue as to what class is giving the error The application runs properly locally but no the cluster and I have my own custom KryoRegistrator and register sereral dozen classes - essentially everything I can find which implements Serializable How to I find what the KryoSerializer issue is? I would love to see a list of all classes Kryo serialized The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: Use RDD like a Iterator
Would Rdd.map() do what you need? It will apply a function to every element of the rdd and return a resulting RDD. -Original Message- From: Zhan Zhang [zzh...@hortonworks.commailto:zzh...@hortonworks.com] Sent: Tuesday, October 28, 2014 11:23 PM Eastern Standard Time To: Dai, Kevin Cc: user@spark.apache.org Subject: Re: Use RDD like a Iterator I think it is already lazily computed, or do you mean something else? Following is the signature of compute in RDD def compute(split: Partition, context: TaskContext): Iterator[T] Thanks. Zhan Zhang On Oct 28, 2014, at 8:15 PM, Dai, Kevin yun...@ebay.commailto:yun...@ebay.com wrote: Hi, ALL I have a RDD[T], can I use it like a iterator. That means I can compute every element of this RDD lazily. Best Regards, Kevin. CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0
Hi Xiangrui - I can certainly save the data before ALS - that would be a great first step. Why would reducing the number of partitions help? I would very much like to understand what¹s happening internally. Also, with regards to Burak¹s earlier comment, here is the JIRA referencing this problem. https://issues.apache.org/jira/browse/SPARK-3080 On 10/27/14, 6:12 PM, Xiangrui Meng men...@gmail.com wrote: Could you save the data before ALS and try to reproduce the problem? You might try reducing the number of partitions and not using Kryo serialization, just to narrow down the issue. -Xiangrui On Mon, Oct 27, 2014 at 1:29 PM, Ilya Ganelin ilgan...@gmail.com wrote: Hi Burak. I always see this error. I'm running the CDH 5.2 version of Spark 1.1.0. I load my data from HDFS. By the time it hits the recommender it had gone through many spark operations. On Oct 27, 2014 4:03 PM, Burak Yavuz bya...@stanford.edu wrote: Hi, I've come across this multiple times, but not in a consistent manner. I found it hard to reproduce. I have a jira for it: SPARK-3080 Do you observe this error every single time? Where do you load your data from? Which version of Spark are you running? Figuring out the similarities may help in pinpointing the bug. Thanks, Burak - Original Message - From: Ilya Ganelin ilgan...@gmail.com To: user user@spark.apache.org Sent: Monday, October 27, 2014 11:36:46 AM Subject: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0 Hello all - I am attempting to run MLLib's ALS algorithm on a substantial test vector - approx. 200 million records. I have resolved a few issues I've had with regards to garbage collection, KryoSeralization, and memory usage. I have not been able to get around this issue I see below however: java.lang. ArrayIndexOutOfBoundsException: 6106 org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli b$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS. scala:543) scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) org.apache.spark.mllib.recommendation.ALS.org $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli b$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505) org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli b$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValu esRDD.scala:31) org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValu esRDD.scala:31) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(Externa lAppendOnlyMap.scala:144) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD. scala:159) org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD. scala:158) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Tra versableLike.scala:772) scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca la:59) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scal a:771) org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) I do not have any negative indices or indices that exceed Int-Max. I have partitioned the input data into 300 partitions and my Spark config is below: .set(spark.executor.memory, 14g) .set(spark.storage.memoryFraction, 0.8) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryo.registrator, MyRegistrator) .set(spark.core.connection.ack.wait.timeout,600) .set(spark.akka.frameSize,50) .set(spark.yarn.executor.memoryOverhead,1024) Does anyone have any suggestions as to why i'm seeing the above error or how to get around it? It may be possible to upgrade to the latest version of Spark but the mechanism for doing so in our environment isn't obvious yet. -Ilya Ganelin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The