SPARK_MASTER_IP
a grep for SPARK_MASTER_IP shows that sbin/start-master.sh and sbin/start-slaves.sh are the only ones that use it. yet for example in CDH5 the spark-master is started from /etc/init.d/spark-master by running bin/spark-class. does that means SPARK_MASTER_IP is simply ignored? it looks like that to me. it is supposed to?
Re: How to initialize StateDStream
there's no need to initialize StateDStream. Take a look at example StatefulNetworkWordCount.scala, it's part of spark source code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Scala
Is it always true that whenever we apply operations on an RDD, we get another RDD? Or does it depend on the return type of the operation? On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta soumya.sima...@gmail.com wrote: An RDD is a fault-tolerant distributed structure. It is the primary abstraction in Spark. I would strongly suggest that you have a look at the following to get a basic idea. http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html http://spark.apache.org/docs/latest/quick-start.html#basics https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Take for example this: I have declared one queue *val queue = Queue.empty[Int]*, which is a pure scala line in the program. I actually want the queue to be an RDD but there are no direct methods to create RDD which is a queue right? What say do you have on this? Does there exist something like: *Create and RDD which is a queue *? On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan hshreedha...@cloudera.com wrote: No, Scala primitives remain primitives. Unless you create an RDD using one of the many methods - you would not be able to access any of the RDD methods. There is no automatic porting. Spark is an application as far as scala is concerned - there is no compilation (except of course, the scala, JIT compilation etc). On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I know that unpersist is a method on RDD. But my confusion is that, when we port our Scala programs to Spark, doesn't everything change to RDDs? On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: unpersist is a method on RDDs. RDDs are abstractions introduced by Spark. An Int is just a Scala Int. You can't call unpersist on Int in Scala, and that doesn't change in Spark. On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: There is one thing that I am confused about. Spark has codes that have been implemented in Scala. Now, can we run any Scala code on the Spark framework? What will be the difference in the execution of the scala code in normal systems and on Spark? The reason for my question is the following: I had a variable *val temp = some operations* This temp was being created inside the loop, so as to manually throw it out of the cache, every time the loop ends I was calling *temp.unpersist()*, this was returning an error saying that *value unpersist is not a method of Int*, which means that temp is an Int. Can some one explain to me why I was not able to call *unpersist* on *temp*? Thank You
Re: How to save mllib model to hdfs and reload it
Shixiong, These two snippets behave different in Scala. In the second snippet, you define variable named m and does evaluate the right hand size as part of the definition. In other words, the variable was replaced by the pre-computed value of Array(1.0) in the subsequently code. So in the second snippet, you do not need to serialize class and it can work well even in distributed environment because it only send the pre-computed value rather than the whole class to different execute nodes. 2014-08-14 22:54 GMT+08:00 Shixiong Zhu zsxw...@gmail.com: I think I can reproduce this error. The following code cannot work and report Foo cannot be serialized. (log in gist https://gist.github.com/zsxwing/4f9f17201d4378fe3e16): class Foo { def foo() = Array(1.0) } val t = new Foo val m = t.foo val r1 = sc.parallelize(List(1, 2, 3)) val r2 = r1.map(_ + m(0)) r2.toArray But the following code can work (log in gist https://gist.github.com/zsxwing/802cade0facb36a37656): class Foo { def foo() = Array(1.0) } var m: Array[Double] = null { val t = new Foo m = t.foo } val r1 = sc.parallelize(List(1, 2, 3)) val r2 = r1.map(_ + m(0)) r2.toArray Best Regards, Shixiong Zhu 2014-08-14 22:11 GMT+08:00 Christopher Nguyen c...@adatao.com: Hi Hoai-Thu, the issue of private default constructor is unlikely the cause here, since Lance was already able to load/deserialize the model object. And on that side topic, I wish all serdes libraries would just use constructor.setAccessible(true) by default :-) Most of the time that privacy is not about serdes reflection restrictions. Sent while mobile. Pls excuse typos etc. On Aug 14, 2014 1:58 AM, Hoai-Thu Vuong thuv...@gmail.com wrote: A man in this community give me a video: https://www.youtube.com/watch?v=sPhyePwo7FA. I've got a same question in this community and other guys helped me to solve this problem. I'm trying to load MatrixFactorizationModel from object file, but compiler said that, I can not create object because the constructor is private. To solve this, I put my new object to same package as MatrixFactorizationModel. Luckly it works. On Wed, Aug 13, 2014 at 9:20 PM, Christopher Nguyen c...@adatao.com wrote: Lance, some debugging ideas: you might try model.predict(RDD[Vector]) to isolate the cause to serialization of the loaded model. And also try to serialize the deserialized (loaded) model manually to see if that throws any visible exceptions. Sent while mobile. Pls excuse typos etc. On Aug 13, 2014 7:03 AM, lancezhange lancezha...@gmail.com wrote: my prediction codes are simple enough as follows: *val labelsAndPredsOnGoodData = goodDataPoints.map { point = val prediction = model.predict(point.features) (point.label, prediction) }* when model is the loaded one, above code just can't work. Can you catch the error? Thanks. PS. i use spark-shell under standalone mode, version 1.0.0 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-save-mllib-model-to-hdfs-and-reload-it-tp11953p12035.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Thu.
Re: Spark and Scala
This is all covered in http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations By definition, RDD transformations take an RDD to another RDD; actions produce some other type as a value on the driver program. On Fri, Sep 12, 2014 at 11:15 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: Is it always true that whenever we apply operations on an RDD, we get another RDD? Or does it depend on the return type of the operation? On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta soumya.sima...@gmail.com wrote: An RDD is a fault-tolerant distributed structure. It is the primary abstraction in Spark. I would strongly suggest that you have a look at the following to get a basic idea. http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html http://spark.apache.org/docs/latest/quick-start.html#basics https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Take for example this: I have declared one queue *val queue = Queue.empty[Int]*, which is a pure scala line in the program. I actually want the queue to be an RDD but there are no direct methods to create RDD which is a queue right? What say do you have on this? Does there exist something like: *Create and RDD which is a queue *? On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan hshreedha...@cloudera.com wrote: No, Scala primitives remain primitives. Unless you create an RDD using one of the many methods - you would not be able to access any of the RDD methods. There is no automatic porting. Spark is an application as far as scala is concerned - there is no compilation (except of course, the scala, JIT compilation etc). On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I know that unpersist is a method on RDD. But my confusion is that, when we port our Scala programs to Spark, doesn't everything change to RDDs? On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: unpersist is a method on RDDs. RDDs are abstractions introduced by Spark. An Int is just a Scala Int. You can't call unpersist on Int in Scala, and that doesn't change in Spark. On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: There is one thing that I am confused about. Spark has codes that have been implemented in Scala. Now, can we run any Scala code on the Spark framework? What will be the difference in the execution of the scala code in normal systems and on Spark? The reason for my question is the following: I had a variable *val temp = some operations* This temp was being created inside the loop, so as to manually throw it out of the cache, every time the loop ends I was calling *temp.unpersist()*, this was returning an error saying that *value unpersist is not a method of Int*, which means that temp is an Int. Can some one explain to me why I was not able to call *unpersist* on *temp*? Thank You
Re: Serving data
You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Scala
Take for example this: *val lines = sc.textFile(args(0))* *val nodes = lines.map(s ={ * *val fields = s.split(\\s+)* *(fields(0),fields(1))* *}).distinct().groupByKey().cache() * *val nodeSizeTuple = nodes.map(node = (node._1.toInt, node._2.size))* *val rootNode = nodeSizeTuple.top(1)(Ordering.by(f = f._2))* The nodeSizeTuple is an RDD,but rootNode is an array. Here I have used all RDD operations, but I am getting an array. What about this case? On Sat, Sep 13, 2014 at 11:45 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Is it always true that whenever we apply operations on an RDD, we get another RDD? Or does it depend on the return type of the operation? On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta soumya.sima...@gmail.com wrote: An RDD is a fault-tolerant distributed structure. It is the primary abstraction in Spark. I would strongly suggest that you have a look at the following to get a basic idea. http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html http://spark.apache.org/docs/latest/quick-start.html#basics https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Take for example this: I have declared one queue *val queue = Queue.empty[Int]*, which is a pure scala line in the program. I actually want the queue to be an RDD but there are no direct methods to create RDD which is a queue right? What say do you have on this? Does there exist something like: *Create and RDD which is a queue *? On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan hshreedha...@cloudera.com wrote: No, Scala primitives remain primitives. Unless you create an RDD using one of the many methods - you would not be able to access any of the RDD methods. There is no automatic porting. Spark is an application as far as scala is concerned - there is no compilation (except of course, the scala, JIT compilation etc). On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I know that unpersist is a method on RDD. But my confusion is that, when we port our Scala programs to Spark, doesn't everything change to RDDs? On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: unpersist is a method on RDDs. RDDs are abstractions introduced by Spark. An Int is just a Scala Int. You can't call unpersist on Int in Scala, and that doesn't change in Spark. On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: There is one thing that I am confused about. Spark has codes that have been implemented in Scala. Now, can we run any Scala code on the Spark framework? What will be the difference in the execution of the scala code in normal systems and on Spark? The reason for my question is the following: I had a variable *val temp = some operations* This temp was being created inside the loop, so as to manually throw it out of the cache, every time the loop ends I was calling *temp.unpersist()*, this was returning an error saying that *value unpersist is not a method of Int*, which means that temp is an Int. Can some one explain to me why I was not able to call *unpersist* on *temp*? Thank You
Re: sc.textFile problem due to newlines within a CSV record
Thanks Xiangrui. This file already exists w/o escapes. I could probably try to preprocess it and add the escaping. On Fri, Sep 12, 2014 at 9:38 PM, Xiangrui Meng men...@gmail.com wrote: I wrote an input format for Redshift's tables unloaded UNLOAD the ESCAPE option: https://github.com/mengxr/redshift-input-format , which can recognize multi-line records. Redshift puts a backslash before any in-record `\\`, `\r`, `\n`, and the delimiter character. You can apply the same escaping before calling saveAsTextFIle, then use the input format to load them back. Xiangrui On Fri, Sep 12, 2014 at 7:43 PM, Mohit Jaggi mohitja...@gmail.com wrote: Folks, I think this might be due to the default TextInputFormat in Hadoop. Any pointers to solutions much appreciated. More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $character, you could write your own InputFormat that parses files into records split on this character instead. Thanks, Mohit
Re: Spark and Scala
Again, RDD operations are of two basic varieties: transformations, that produce further RDDs; and operations, that return values to the driver program. You've used several RDD transformations and then finally the top(1) action, which returns an array of one element to your driver program. That is exactly what you should expect from the description of RDD#top in the API. http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD On Sat, Sep 13, 2014 at 12:34 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Take for example this: *val lines = sc.textFile(args(0))* *val nodes = lines.map(s ={ * *val fields = s.split(\\s+)* *(fields(0),fields(1))* *}).distinct().groupByKey().cache() * *val nodeSizeTuple = nodes.map(node = (node._1.toInt, node._2.size))* *val rootNode = nodeSizeTuple.top(1)(Ordering.by(f = f._2))* The nodeSizeTuple is an RDD,but rootNode is an array. Here I have used all RDD operations, but I am getting an array. What about this case? On Sat, Sep 13, 2014 at 11:45 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Is it always true that whenever we apply operations on an RDD, we get another RDD? Or does it depend on the return type of the operation? On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta soumya.sima...@gmail.com wrote: An RDD is a fault-tolerant distributed structure. It is the primary abstraction in Spark. I would strongly suggest that you have a look at the following to get a basic idea. http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html http://spark.apache.org/docs/latest/quick-start.html#basics https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Take for example this: I have declared one queue *val queue = Queue.empty[Int]*, which is a pure scala line in the program. I actually want the queue to be an RDD but there are no direct methods to create RDD which is a queue right? What say do you have on this? Does there exist something like: *Create and RDD which is a queue *? On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan hshreedha...@cloudera.com wrote: No, Scala primitives remain primitives. Unless you create an RDD using one of the many methods - you would not be able to access any of the RDD methods. There is no automatic porting. Spark is an application as far as scala is concerned - there is no compilation (except of course, the scala, JIT compilation etc). On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: I know that unpersist is a method on RDD. But my confusion is that, when we port our Scala programs to Spark, doesn't everything change to RDDs? On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: unpersist is a method on RDDs. RDDs are abstractions introduced by Spark. An Int is just a Scala Int. You can't call unpersist on Int in Scala, and that doesn't change in Spark. On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan pradhandeep1...@gmail.com wrote: There is one thing that I am confused about. Spark has codes that have been implemented in Scala. Now, can we run any Scala code on the Spark framework? What will be the difference in the execution of the scala code in normal systems and on Spark? The reason for my question is the following: I had a variable *val temp = some operations* This temp was being created inside the loop, so as to manually throw it out of the cache, every time the loop ends I was calling *temp.unpersist()*, this was returning an error saying that *value unpersist is not a method of Int*, which means that temp is an Int. Can some one explain to me why I was not able to call *unpersist* on *temp*? Thank You
Re: [mllib] LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD
I also found https://github.com/apache/spark/commit/8f6e2e9df41e7de22b1d1cbd524e20881f861dd0 had resolve this issue but it seems that right code snippet not occurs in master or 1.1 release. 2014-09-13 17:12 GMT+08:00 Yanbo Liang yanboha...@gmail.com: Hi All, I found that LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD in master and 1.1 release. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala#L199 In the above code snippet, users can only construct a LogisticRegressionWithLBFGS without any user specified parameters. Although users can specific training parameters calling corresponding function of class LBFGS. But this behave different with LogisticRegressionWithSGD. Could anyone can tell me why we did not refactor the code to keep consistent interface? Thank you Yanbo
[mllib] LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD
Hi All, I found that LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD in master and 1.1 release. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala#L199 In the above code snippet, users can only construct a LogisticRegressionWithLBFGS without any user specified parameters. Although users can specific training parameters calling corresponding function of class LBFGS. But this behave different with LogisticRegressionWithSGD. Could anyone can tell me why we did not refactor the code to keep consistent interface? Thank you Yanbo
Re: [mllib] LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD
Hi Yanbo, We made the change here https://github.com/apache/spark/commit/5d25c0b74f6397d78164b96afb8b8cbb1b15cfbd Those apis to set the parameters are very difficult to maintain, so we decide not to provide them. In next release, Spark 1.2, we will have a better api design for parameter setting. Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Sat, Sep 13, 2014 at 2:12 AM, Yanbo Liang yanboha...@gmail.com wrote: Hi All, I found that LogisticRegressionWithLBFGS interface is not consistent with LogisticRegressionWithSGD in master and 1.1 release. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala#L199 In the above code snippet, users can only construct a LogisticRegressionWithLBFGS without any user specified parameters. Although users can specific training parameters calling corresponding function of class LBFGS. But this behave different with LogisticRegressionWithSGD. Could anyone can tell me why we did not refactor the code to keep consistent interface? Thank you Yanbo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RDDs and Immutability
Hi, We all know that RDDs are immutable. There are not enough operations that can achieve anything and everything on RDDs. Take for example this: I want an Array of Bytes filled with zeros which during the program should change. Some elements of that Array should change to 1. If I make an RDD with all elements as zero, I won't be able to change the elements. On the other hand, if I declare as Array then so much memory will be consumed. Please clarify this to me. Thank You
ReduceByKey performance optimisation
Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,
Re: Serving data
however, the cache is not guaranteed to remain, if other jobs are launched in the cluster and require more memory than what's left in the overall caching memory, previous RDDs will be discarded. Using an off heap cache like tachyon as a dump repo can help. In general, I'd say that using a persistent sink (like Cassandra for instance) is best. my .2¢ aℕdy ℙetrella about.me/noootsab [image: aℕdy ℙetrella on about.me] http://about.me/noootsab On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com wrote: You can cache data in memory query it using Spark Job Server. Most folks dump data down to a queue/db for retrieval You can batch up data store into parquet partitions as well. query it using another SparkSQL shell, JDBC driver in SparkSQL is part 1.1 i believe. -- Regards, Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote: Hi there, I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote Scalding jobs - one-off, read data from HDFS, count words, write counts back to HDFS. Now I want to display these counts in a dashboard. Since Spark allows to cache RDDs in-memory and you have to explicitly terminate your app (and there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running indefinitely and query an in-memory RDD from the outside (via SparkSQL for example). Is this how others are using Spark? Or are you just dumping job results into message queues or databases? Thanks - Marius - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ReduceByKey performance optimisation
If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JMXSink for YARN deployment
Hi, Jerry said I'm guessing, so maybe the thing to try is to check if his guess is correct. What about running sudo lsof | grep metrics.properties ? I imagine you should be able to see it if the file was found and read. If Jerry is right, then I think you will NOT see it. Next, how about trying some bogus value in metrics.properties, like *.sink. jmx.class=org.apache.spark.metrics.sink.*BUGUSSink*? If the file is being read then specifying such bogus value should make something log an error or throw exception at start, I assume. If you don't see this then maybe this file is not being read at all. Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Thu, Sep 11, 2014 at 9:18 AM, Shao, Saisai saisai.s...@intel.com wrote: Hi, I’m guessing the problem is that driver or executor cannot get the metrics.properties configuration file in the yarn container, so metrics system cannot load the right sinks. Thanks Jerry *From:* Vladimir Tretyakov [mailto:vladimir.tretya...@sematext.com] *Sent:* Thursday, September 11, 2014 7:30 PM *To:* user@spark.apache.org *Subject:* JMXSink for YARN deployment Hello, we are in Sematext (https://apps.sematext.com/) are writing Monitoring tool for Spark and we came across one question: How to enable JMX metrics for YARN deployment? We put *.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink to file $SPARK_HOME/conf/metrics.properties but it doesn't work. Everything works in Standalone mode, but not in YARN mode. Can somebody help? Thx! PS: I've found also https://stackoverflow.com/questions/23529404/spark-on-yarn-how-to-send-metrics-to-graphite-sink/25786112 without answer.
Re: ReduceByKey performance optimisation
I need to remove objects with duplicate key, but I need the whole object. Object which have the same key are not necessarily equal, though (but I can dump any on the ones that have identical key). 2014-09-13 12:50 GMT+02:00 Sean Owen so...@cloudera.com: If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,
Re: How to initiate a shutdown of Spark Streaming context?
Your app is the running Spark Streaming system. It would be up to you to build some mechanism that lets you cause it to call stop() in response to some signal from you. On Fri, Sep 12, 2014 at 3:59 PM, stanley wangshua...@yahoo.com wrote: In spark streaming programming document https://spark.apache.org/docs/latest/streaming-programming-guide.html , it specifically states how to shut down a spark streaming context: The existing application is shutdown gracefully (see StreamingContext.stop(...) or JavaStreamingContext.stop(...) for graceful shutdown options) which ensure data that have been received is completely processed before shutdown. However, my question is, how do I initiate a shut down? Assume I am upgrading a running Spark streaming system, how do I send a message to the running spark streaming instance so that the call StreamingContext.stop(...) is made? Thanks, Stanley -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initiate-a-shutdown-of-Spark-Streaming-context-tp14092.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1 failure. class conflict?
No, your error is right there in the logs. Unset SPARK_CLASSPATH. On Fri, Sep 12, 2014 at 10:20 PM, freedafeng freedaf...@yahoo.com wrote: : org.apache.spark.SparkException: Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: compiling spark source code
Hi, I took am having problem with compiling Spark from source. However, my problem is different. I downloaded latest version (1.1.0) and ran ./sbt/sbt assembly from the command line. I end up with the following error [info] SHA-1: 20abd673d1e0690a6d5b64951868eef8d332d084 [info] Packaging /home/kkpt/Desktop/spark-1.1.0/examples/target/scala-2.10/spark-examples-1.1.0-hadoop1.0.4.jar ... [info] Done packaging. [error] (repl/compile:compile) Compilation failed [error] Total time: 207 s, completed Sep 13, 2014 2:57:45 PM Can you tell how i could proceed forward? (I do not need hadoop support) Thanks in advance and hear from you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14166.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize StateDStream
I had looked at that. If I have a set of saved word counts from previous run, and want to load that in the next run, what is the best way to do it? I am thinking of hacking the Spark code and have an initial rdd in StateDStream, and use that in for the first time. On Fri, Sep 12, 2014 at 11:04 PM, qihong qc...@pivotal.io wrote: there's no need to initialize StateDStream. Take a look at example StatefulNetworkWordCount.scala, it's part of spark source code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: ReduceByKey performance optimisation
This is more concise: x.groupBy(obj.fieldtobekey).values.map(_.head) ... but I doubt it's faster. If all objects with the same fieldtobekey are within the same partition, then yes I imagine your biggest speedup comes from exploiting that. How about ... x.mapPartitions(_.map(obj = (obj.fieldtobekey, obj)).toMap.values) This does require that all keys, plus a representative object each, fits in memory. I bet you can make it faster than this example too. On Sat, Sep 13, 2014 at 1:15 PM, Gary Malouf malouf.g...@gmail.com wrote: You need something like: val x: RDD[MyAwesomeObject] x.map(obj = obj.fieldtobekey - obj).reduceByKey { case (l, _) = l } Does that make sense? On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme julien.ca...@gmail.com wrote: I need to remove objects with duplicate key, but I need the whole object. Object which have the same key are not necessarily equal, though (but I can dump any on the ones that have identical key). 2014-09-13 12:50 GMT+02:00 Sean Owen so...@cloudera.com: If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help, - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: RDDs and Immutability
Have you tried using RDD.map() to transform some of the RDD elements from 0 to 1? Why doesn’t that work? That’s how you change data in Spark, by defining a new RDD that’s a transformation of an old one. On Sat, Sep 13, 2014 at 5:39 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: Hi, We all know that RDDs are immutable. There are not enough operations that can achieve anything and everything on RDDs. Take for example this: I want an Array of Bytes filled with zeros which during the program should change. Some elements of that Array should change to 1. If I make an RDD with all elements as zero, I won't be able to change the elements. On the other hand, if I declare as Array then so much memory will be consumed. Please clarify this to me. Thank You
Re: compiling spark source code
bq. [error] (repl/compile:compile) Compilation failed Can you pastebin more of the output ? Cheers
Re: Nested Case Classes (Found and Required Same)
Upgraded to 1.1 and the issue is resolved. Thanks. I still wonder if there is a better way to approach a large attribute dataset. On Fri, Sep 12, 2014 at 12:20 PM, Prashant Sharma scrapco...@gmail.com wrote: What is your spark version ? This was fixed I suppose. Can you try it with latest release ? Prashant Sharma On Fri, Sep 12, 2014 at 9:47 PM, Ramaraju Indukuri iramar...@gmail.com wrote: This is only a problem in shell, but works fine in batch mode though. I am also interested in how others are solving the problem of case class limitation on number of variables. Regards Ram On Fri, Sep 12, 2014 at 12:12 PM, iramaraju iramar...@gmail.com wrote: I think this is a popular issue, but need help figuring a way around if this issue is unresolved. I have a dataset that has more than 70 columns. To have all the columns fit into my RDD, I am experimenting the following. (I intend to use the InputData to parse the file and have 3 or 4 columnsets to accommodate the full list of variables) case class ColumnSet(C1: Double , C2: Double , C3: Double) case class InputData(EQN: String, ts: String,Set1 :ColumnSet,Set2 :ColumnSet) val set1 = ColumnSet(1,2,3) val a = InputData(a,a,set1,set1) returns the following console:16: error: type mismatch; found : ColumnSet required: ColumnSet val a = InputData(a,a,set1,set1) Where as the same code works fine in my scala console. Is there a work around for my problem ? Regards Ram -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Nested-Case-Classes-Found-and-Required-Same-tp14096.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- -- Ramaraju Indukuri -- -- Ramaraju Indukuri
Re: ReduceByKey performance optimisation
OK, mapPartition seems to be the way to go. Thanks for the help! Le 13 sept. 2014 16:41, Sean Owen so...@cloudera.com a écrit : This is more concise: x.groupBy(obj.fieldtobekey).values.map(_.head) ... but I doubt it's faster. If all objects with the same fieldtobekey are within the same partition, then yes I imagine your biggest speedup comes from exploiting that. How about ... x.mapPartitions(_.map(obj = (obj.fieldtobekey, obj)).toMap.values) This does require that all keys, plus a representative object each, fits in memory. I bet you can make it faster than this example too. On Sat, Sep 13, 2014 at 1:15 PM, Gary Malouf malouf.g...@gmail.com wrote: You need something like: val x: RDD[MyAwesomeObject] x.map(obj = obj.fieldtobekey - obj).reduceByKey { case (l, _) = l } Does that make sense? On Sat, Sep 13, 2014 at 7:28 AM, Julien Carme julien.ca...@gmail.com wrote: I need to remove objects with duplicate key, but I need the whole object. Object which have the same key are not necessarily equal, though (but I can dump any on the ones that have identical key). 2014-09-13 12:50 GMT+02:00 Sean Owen so...@cloudera.com: If you are just looking for distinct keys, .keys.distinct() should be much better. On Sat, Sep 13, 2014 at 10:46 AM, Julien Carme julien.ca...@gmail.com wrote: Hello, I am facing performance issues with reduceByKey. In know that this topic has already been covered but I did not really find answers to my question. I am using reduceByKey to remove entries with identical keys, using, as reduce function, (a,b) = a. It seems to be a relatively straightforward use of reduceByKey, but performances on moderately big RDDs (some tens of millions of line) are very low, far from what you can reach with mono-server computing packages like R for example. I have read on other threads on the topic that reduceByKey always entirely shuffle the whole data. Is that true ? So it means that a custom partitionning could not help, right? In my case, I could relatively easily grant that two identical keys would always be on the same partition, therefore an option could by to use mapPartition and reeimplement reduce locally, but I would like to know if there are simpler / more elegant alternatives. Thanks for your help,
Write 1 RDD to multiple output paths in one go
Howdy doody Spark Users, I’d like to somehow write out a single RDD to multiple paths in one go. Here’s an example. I have an RDD of (key, value) pairs like this: a = sc.parallelize(['Nick', 'Nancy', 'Bob', 'Ben', 'Frankie']).keyBy(lambda x: x[0]) a.collect() [('N', 'Nick'), ('N', 'Nancy'), ('B', 'Bob'), ('B', 'Ben'), ('F', 'Frankie')] Now I want to write the RDD out to different paths depending on the keys, so that I have one output directory per distinct key. Each output directory could potentially have multiple part- files or whatever. So my output would be something like: /path/prefix/n [/part-1, /part-2, etc] /path/prefix/b [/part-1, /part-2, etc] /path/prefix/f [/part-1, /part-2, etc] How would you do that? I suspect I need to use saveAsNewAPIHadoopFile http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsNewAPIHadoopFile or saveAsHadoopFile http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#saveAsHadoopFile along with the MultipleTextOutputFormat output format class, but I’m not sure how. By the way, there is a very similar question to this here on Stack Overflow http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job . Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Write-1-RDD-to-multiple-output-paths-in-one-go-tp14174.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: compiling spark source code
Hi Ted, Thanks for the prompt reply :) please find details of the issue at this url http://pastebin.com/Xt0hZ38q http://pastebin.com/Xt0hZ38q Kind Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize StateDStream
I'm not sure what you mean by previous run. Is it previous batch? or previous run of spark-submit? If it's previous batch (spark streaming creates a batch every batch interval), then there's nothing to do. If it's previous run of spark-submit (assuming you are able to save the result somewhere), then I can think of two possible ways to do it: 1. read saved result as RDD (just do this once), and join the RDD with each RDD of the stateStream. 2. add extra logic to updateFunction: when the previous state is None (one of two Option type values), you get save state for the given key from saved result somehow, then your original logic to create new state object based on Seq[V] and previous state. note that you need use this version of updateFunction: updateFunc: (Iterator[(K, Seq[V], Option[S])]) = Iterator[(K, S)], which make key available to the update function. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: compiling spark source code
bq. [error] File name too long It is not clear which file(s) loadfiles was loading. Is the filename in earlier part of the output ? Cheers On Sat, Sep 13, 2014 at 10:58 AM, kkptninja kkptni...@gmail.com wrote: Hi Ted, Thanks for the prompt reply :) please find details of the issue at this url http://pastebin.com/Xt0hZ38q http://pastebin.com/Xt0hZ38q Kind Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Workload for spark testing
Hi All: We know some memory of spark are used for computing (e.g., spark.shuffle.memoryFraction) and some are used for caching RDD for future use (e.g., spark.storage.memoryFraction). Is there any existing workload which can utilize both of them during the running left cycle? I want to do some performance by adjusting the ratio of them. Thanks. -- *Regards,* *Zhaojie*
Re: compiling spark source code
Can you try sbt/sbt clean first? On Sat, Sep 13, 2014 at 4:29 PM, Ted Yu yuzhih...@gmail.com wrote: bq. [error] File name too long It is not clear which file(s) loadfiles was loading. Is the filename in earlier part of the output ? Cheers On Sat, Sep 13, 2014 at 10:58 AM, kkptninja kkptni...@gmail.com wrote: Hi Ted, Thanks for the prompt reply :) please find details of the issue at this url http://pastebin.com/Xt0hZ38q http://pastebin.com/Xt0hZ38q Kind Regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14175.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize StateDStream
Thanks for the pointers. I meant previous run of spark-submit. For 1: This would be a bit more computation in every batch. 2: Its a good idea, but it may be inefficient to retrieve each value. In general, for a generic state machine the initialization and input sequence is critical for correctness. On Sat, Sep 13, 2014 at 12:17 PM, qihong qc...@pivotal.io wrote: I'm not sure what you mean by previous run. Is it previous batch? or previous run of spark-submit? If it's previous batch (spark streaming creates a batch every batch interval), then there's nothing to do. If it's previous run of spark-submit (assuming you are able to save the result somewhere), then I can think of two possible ways to do it: 1. read saved result as RDD (just do this once), and join the RDD with each RDD of the stateStream. 2. add extra logic to updateFunction: when the previous state is None (one of two Option type values), you get save state for the given key from saved result somehow, then your original logic to create new state object based on Seq[V] and previous state. note that you need use this version of updateFunction: updateFunc: (Iterator[(K, Seq[V], Option[S])]) = Iterator[(K, S)], which make key available to the update function. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.1.0 unit tests fail
Hi Koert, Thanks for reporting this. These tests have been flaky even on the master branch for a long time. You can safely disregard these test failures, as the root cause is port collisions from the many SparkContexts we create over the course of the entire test. There is a patch that fixes this but not back ported into branch-1.1 yet. I will do that shortly. -Andrew 2014-09-13 17:27 GMT-07:00 Koert Kuipers ko...@tresata.com: on ubuntu 12.04 with 2 cores and 8G of RAM i see errors when i run the tests for spark 1.1.0. not sure how significant this is, since i used to see errors for spark 1.0.0 too $ java -version java version 1.6.0_43 Java(TM) SE Runtime Environment (build 1.6.0_43-b01) Java HotSpot(TM) 64-Bit Server VM (build 20.14-b01, mixed mode) $ mvn -version Apache Maven 3.0.4 Maven home: /usr/share/maven Java version: 1.6.0_43, vendor: Sun Microsystems Inc. Java home: /usr/lib/jvm/jdk1.6.0_43/jre Default locale: en_US, platform encoding: UTF-8 OS name: linux, version: 3.5.0-54-generic, arch: amd64, family: unix $ export MAVEN_OPTS=-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m $ mvn clean package -DskipTests $ mvn test it is still running, and is very slow (and curiously with very low cpu usage, like 5%) but i already see the following errors: DriverSuite: - driver should exit after finishing *** FAILED *** TestFailedDueToTimeoutException was thrown during property evaluation. (DriverSuite.scala:40) Message: The code passed to failAfter did not complete within 60 seconds. Location: (DriverSuite.scala:41) Occurred at table row 0 (zero based, not counting headings), which had values ( master = local ) SparkSubmitSuite: - launch simple application with spark-submit *** FAILED *** org.apache.spark.SparkException: Process List(./bin/spark-submit, --class, org.apache.spark.deploy.SimpleApplicationTest, --name, testApp, --master, local, file:/tmp/1410653580697-0/testJar-1410653580697.jar) exited with code 1 at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:872) at org.apache.spark.deploy.SparkSubmitSuite.runSparkSubmit(SparkSubmitSuite.scala:311) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$14.apply$mcV$sp(SparkSubmitSuite.scala:291) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$14.apply(SparkSubmitSuite.scala:284) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$14.apply(SparkSubmitSuite.scala:284) at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22) at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) ... - spark submit includes jars passed in through --jar *** FAILED *** org.apache.spark.SparkException: Process List(./bin/spark-submit, --class, org.apache.spark.deploy.JarCreationTest, --name, testApp, --master, local-cluster[2,1,512], --jars, file:/tmp/1410653674739-0/testJar-1410653674790.jar,file:/tmp/1410653674791-0/testJar-1410653674833.jar, file:/tmp/1410653674737-0/testJar-1410653674737.jar) exited with code 1 at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:872) at org.apache.spark.deploy.SparkSubmitSuite.runSparkSubmit(SparkSubmitSuite.scala:311) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$15.apply$mcV$sp(SparkSubmitSuite.scala:305) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$15.apply(SparkSubmitSuite.scala:294) at org.apache.spark.deploy.SparkSubmitSuite$$anonfun$15.apply(SparkSubmitSuite.scala:294) at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22) at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) ...
Spark SQL
val file = sc.textFile(hdfs://ec2-54-164-243-97.compute-1.amazonaws.com:9010/user/fin/events.txt) 1. val xyz = file.map(line = extractCurRate(sqlContext.sql(select rate from CurrencyCodeRates where txCurCode = ' + line.substring(202,205) + ' and fxCurCode = ' + fxCurCodesMap(line.substring(77,82)) + ' and effectiveDate = ' + line.substring(221,229) + ' order by effectiveDate desc)) 2. val xyz = file.map(line = sqlContext.sql(select rate, txCurCode, fxCurCode, effectiveDate from CurrencyCodeRates where txCurCode = 'USD' and fxCurCode = 'CSD' and effectiveDate = '20140901' order by effectiveDate desc)) 3. val xyz = sqlContext.sql(select rate, txCurCode, fxCurCode, effectiveDate from CurrencyCodeRates where txCurCode = 'USD' and fxCurCode = 'CSD' and effectiveDate = '20140901' order by effectiveDate desc) xyz.saveAsTextFile(/user/output) In statements 1 and 2 I'm getting nullpointer expecption. But statement 3 is good. I'm guessing spark context and sql context are not going together well. Any suggestions regarding how I can achieve this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-tp14183.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org