Unsubscribe
Unsubscribe
unsubscribe
Re: Finding the number of executors.
Following is a method that retrieves the list of executors registered to a spark context. It worked perfectly with spark-submit in standalone mode for my project. /** * A simplified method that just returns the current active/registered executors * excluding the driver. * @param sc * The spark context to retrieve registered executors. * @return * A list of executors each in the form of host:port. */ def currentActiveExecutors(sc: SparkContext): Seq[String] = { val allExecutors = sc.getExecutorMemoryStatus.map(_._1) val driverHost: String = sc.getConf.get(spark.driver.host) allExecutors.filter(! _.split(:)(0).equals(driverHost)).toList } On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc virg...@gmail.com wrote: Hi Akhil, I'm using spark 1.4.1. Number of executors is not in the command line, not in the getExecutorMemoryStatus (I already mentioned that I tried that, works in spark-shell but not when executed via spark-submit). I tried looking at defaultParallelism too, it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when ran via spark-submit. But the scheduler obviously knows this information. It *must* know it. How can I access it? Other that parsing the HTML of the WebUI, that is... that's pretty much guaranteed to work, and maybe I'll do that, but it's extremely convoluted. Regards,Virgil. On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Which version spark are you using? There was a discussion happened over here http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.htmlhttp://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3EOn Aug 21, 2015 7:42 AM, Virgil Palanciuc vir...@palanciuc.eu wrote: Is there any reliable way to find out the number of executors programatically - regardless of how the job is run? A method that preferably works for spark-standalone, yarn, mesos, regardless whether the code runs from the shell or not? Things that I tried and don't work:- sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell, does not work if task submitted via spark-submit- sparkContext.getConf.getInt(spark.executor.instances, 1) - doesn't work unless explicitly configured- call to http://master:8080/json (this used to work, but doesn't anymore?) I guess I could parse the output html from the Spark UI... but that seems dumb. is there really no better way? Thanks,Virgil.
Re: Shuffle produces one huge partition and many tiny partitions
I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle produces one huge partition and many tiny partitions
repartition() means coalesce(shuffle=false) On Thursday, June 18, 2015 4:07 PM, Corey Nolet cjno...@gmail.com wrote: Doesn't repartition call coalesce(shuffle=true)?On Jun 18, 2015 6:53 PM, Du Li l...@yahoo-inc.com.invalid wrote: I got the same problem with rdd,repartition() in my streaming app, which generated a few huge partitions and many tiny partitions. The resulting high data skew makes the processing time of a batch unpredictable and often exceeding the batch interval. I eventually solved the problem by using rdd.coalesce() instead, which however is expensive as it yields a lot of shuffle traffic and also takes a long time. Du On Thursday, June 18, 2015 1:00 AM, Al M alasdair.mcbr...@gmail.com wrote: Thanks for the suggestion. Repartition didn't help us unfortunately. It still puts everything into the same partition. We did manage to improve the situation by making a new partitioner that extends HashPartitioner. It treats certain exception keys differently. These keys that are known to appear very often are assigned random partitions instead of using the existing partitioning mechanism. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Shuffle-produces-one-huge-partition-and-many-tiny-partitions-tp23358p23387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark eventLog and history server
Event log is enabled in my spark streaming app. My code runs in standalone mode and the spark version is 1.3.1. I periodically stop and restart the streaming context by calling ssc.stop(). However, from the web UI, when clicking on a past job, it says the job is still in progress and does not show the event log. The event log files have suffix .inprogress. Removing the suffix does not solve the problem. Do I need to do anything here in order to view the event logs of finished jobs? Or do I need to stop ssc differently? In addition, the documentation seems to suggest history server is used for Mesos or YARN mode. Does it mean I don't need to start history server if I only use spark in standalone mode? Thanks,Du
Re: how to use rdd.countApprox
Hi TD, Just let you know the job group and cancelation worked after I switched to spark 1.3.1. I set a group id for rdd.countApprox() and cancel it, then set another group id for the remaining job of the foreachRDD but let it complete. As a by-product, I use group id to indicate what the job does. :-) Thanks,Du On Wednesday, May 13, 2015 4:23 PM, Tathagata Das t...@databricks.com wrote: You might get stage information through SparkListener. But I am not sure whether you can use that information to easily kill stages. Though i highly recommend using Spark 1.3.1 (or even Spark master). Things move really fast between releases. 1.1.1 feels really old to me :P TD On Wed, May 13, 2015 at 1:25 PM, Du Li l...@yahoo-inc.com wrote: I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks,Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug.If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-timerdd.sparkContext.setJobGroup(jobGroupId)val approxCount = rdd.countApprox().getInitialValue // job launched with the set job grouprdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: how to use rdd.countApprox
I do rdd.countApprox() and rdd.sparkContext.setJobGroup() inside dstream.foreachRDD{...}. After calling cancelJobGroup(), the spark context seems no longer valid, which crashes subsequent jobs. My spark version is 1.1.1. I will do more investigation into this issue, perhaps after upgrading to 1.3.1, and then file a JIRA if it persists. Is there a way to get stage or task id of a particular transformation or action on RDD and then selectively kill the stage or tasks? It would be necessary and useful in situations similar to countApprox. Thanks,Du On Wednesday, May 13, 2015 1:12 PM, Tathagata Das t...@databricks.com wrote: That is not supposed to happen :/ That is probably a bug.If you have the log4j logs, would be good to file a JIRA. This may be worth debugging. On Wed, May 13, 2015 at 12:13 PM, Du Li l...@yahoo-inc.com wrote: Actually I tried that before asking. However, it killed the spark context. :-) Du On Wednesday, May 13, 2015 12:02 PM, Tathagata Das t...@databricks.com wrote: That is a good question. I dont see a direct way to do that. You could do try the following val jobGroupId = group-id-based-on-current-timerdd.sparkContext.setJobGroup(jobGroupId)val approxCount = rdd.countApprox().getInitialValue // job launched with the set job grouprdd.sparkContext.cancelJobGroup(jobGroupId) // cancel the job On Wed, May 13, 2015 at 11:24 AM, Du Li l...@yahoo-inc.com wrote: Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: how to use rdd.countApprox
Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: how to use rdd.countApprox
Hi TD, Do you know how to cancel the rdd.countApprox(5000) tasks after the timeout? Otherwise it keeps running until completion, producing results not used but consuming resources. Thanks,Du On Wednesday, May 13, 2015 10:33 AM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi TD, Thanks a lot. rdd.countApprox(5000).initialValue worked! Now my streaming app is standing a much better chance to complete processing each batch within the batch interval. Du On Tuesday, May 12, 2015 10:31 PM, Tathagata Das t...@databricks.com wrote: From the code it seems that as soon as the rdd.countApprox(5000) returns, you can call pResult.initialValue() to get the approximate count at that point of time (that is after timeout). Calling pResult.getFinalValue() will further block until the job is over, and give the final correct values that you would have received by rdd.count() On Tue, May 12, 2015 at 5:03 PM, Du Li l...@yahoo-inc.com.invalid wrote: HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: how to use rdd.countApprox
HI, I tested the following in my streaming app and hoped to get an approximate count within 5 seconds. However, rdd.countApprox(5000).getFinalValue() seemed to always return after it finishes completely, just like rdd.count(), which often exceeded 5 seconds. The values for low, mean, and high were the same. val pResult = rdd.countApprox(5000)val bDouble = pResult.getFinalValue()logInfo(scountApprox().getFinalValue(): low=${bDouble.low.toLong}, mean=${bDouble.mean.toLong}, high=${bDouble.high.toLong}) Can any expert here help explain the right way of usage? Thanks,Du On Wednesday, May 6, 2015 7:55 AM, Du Li l...@yahoo-inc.com.INVALID wrote: I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
how to use rdd.countApprox
I have to count RDD's in a spark streaming app. When data goes large, count() becomes expensive. Did anybody have experience using countApprox()? How accurate/reliable is it? The documentation is pretty modest. Suppose the timeout parameter is in milliseconds. Can I retrieve the count value by calling getFinalValue()? Does it block and return only after the timeout? Or do I need to define onComplete/onFail handlers to extract count value from the partial results? Thanks,Du
Re: RDD coalesce or repartition by #records or #bytes?
Hi, Spark experts: I did rdd.coalesce(numPartitions).saveAsSequenceFile(dir) in my code, which generates the rdd's in streamed batches. It generates numPartitions of files as expected with names dir/part-x. However, the first couple of files (e.g., part-0, part-1) have many times of records than the other files. This highly skewed distribution causes stragglers (and hence unpredictable execution time) and also the need to allocate memory by the worst cases (because some of the records can be much larger than average). To solve this problem, I replaced coalesce(numPartitions) with repartition(numPartitions) or coalesce(numPartitions, shuffle=true), which are equivalent. As a result, the records are more evenly distributed over the output files and the execution time becomes more predictable. It of coarse incurs a lot of shuffle traffic. However, the GC time became prohibitively high, which crashed my app in just a few hours. Adding more memory to executors didn't seem to help. Do you have any suggestion here on how to spread the data without the GC costs? Does repartition() redistribute/shuffle every record by hash partitioner? Why does it drive the GC time so high? Thanks,Du On Wednesday, March 4, 2015 5:39 PM, Zhan Zhang zzh...@hortonworks.com wrote: It use HashPartitioner to distribute the record to different partitions, but the key is just integer evenly across output partitions. From the code, each resulting partition will get very similar number of records. Thanks. Zhan Zhang On Mar 4, 2015, at 3:47 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, My RDD's are created from kafka stream. After receiving a RDD, I want to do coalesce/repartition it so that the data will be processed in a set of machines in parallel as even as possible. The number of processing nodes is larger than the receiving nodes. My question is how the coalesce/repartition works. Does it distribute by the number of records or number of bytes? In my app, my observation is that the distribution seems by number of records. The consequence is, however, some executors have to process x1000 as much as data when the sizes of records are very skewed. Then we have to allocate memory by the worst case. Is there a way to programmatically affect the coalesce /repartition scheme? Thanks,Du
set up spark cluster with heterogeneous hardware
Hi Spark community, I searched for a way to configure a heterogeneous cluster because the need recently emerged in my project. I didn't find any solution out there. Now I have thought out a solution and thought it might be useful to many other people with similar needs. Following is a blog post I just wrote. Feedback is welcome. The capability to accommodate heterogeneous hardware has already been there in Spark. We learn how to use it. Oscar goes to those contributors. :-) Du How to set up a Spark cluster with heterogeneous hardware | | | | | | | | | How to set up a Spark cluster with heterogeneous hardwareApache Spark (http://spark.apache.org) is a big data processing platform that has become widely adopted across the industries in just a few years. The strength of Spark is to make use of the main memory that is getting larger and cheaper to support low-latency use cases. In a pra... | | | | View on scala4fun.tumblr.com | Preview by Yahoo | | | | |
Re: How to use more executors
Is it possible to extend this PR further (or create another PR) to allow for per-node configuration of workers? There are many discussions about heterogeneous spark cluster. Currently configuration on master will override those on the workers. Many spark users have the need for having machines with different cpu/memory capacities in the same cluster. Du On Wednesday, January 21, 2015 3:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: …not sure when will it be reviewed… but for now you can work around by allowing multiple worker instances on a single machine http://spark.apache.org/docs/latest/spark-standalone.html search SPARK_WORKER_INSTANCES Best, -- Nan Zhuhttp://codingcat.me On Wednesday, January 21, 2015 at 6:50 PM, Larry Liu wrote: Will SPARK-1706 be included in next release? On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com wrote: Please see SPARK-1706 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote: I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thankslarry
Re: How to use more executors
Is it being merged in the next release? It's indeed a critical patch! Du On Wednesday, January 21, 2015 3:59 PM, Nan Zhu zhunanmcg...@gmail.com wrote: …not sure when will it be reviewed… but for now you can work around by allowing multiple worker instances on a single machine http://spark.apache.org/docs/latest/spark-standalone.html search SPARK_WORKER_INSTANCES Best, -- Nan Zhuhttp://codingcat.me On Wednesday, January 21, 2015 at 6:50 PM, Larry Liu wrote: Will SPARK-1706 be included in next release? On Wed, Jan 21, 2015 at 2:50 PM, Ted Yu yuzhih...@gmail.com wrote: Please see SPARK-1706 On Wed, Jan 21, 2015 at 2:43 PM, Larry Liu larryli...@gmail.com wrote: I tried to submit a job with --conf spark.cores.max=6 or --total-executor-cores 6 on a standalone cluster. But I don't see more than 1 executor on each worker. I am wondering how to use multiple executors when submitting jobs. Thankslarry
Re: FW: RE: distribution of receivers in spark streaming
Thanks TD and Jerry for suggestions. I have done some experiments and worked out a reasonable solution to the problem of spreading receivers to a set of worker hosts. It would be a bit too tedious to document in email. So I discuss the solution in a blog: http://scala4fun.tumblr.com/post/113172936582/how-to-distribute-receivers-over-worker-hosts-in Please be free to give me feedback if you see any issue. Thanks,Du On Friday, March 6, 2015 4:10 PM, Tathagata Das t...@databricks.com wrote: Aaah, good point, about the same node. All right. Can you post this on the user mailing list for future reference to the community? Might be a good idea to post both methods with pros and cons, as different users may have different constraints. :)Thanks :) TD On Fri, Mar 6, 2015 at 4:07 PM, Du Li l...@yahoo-inc.com wrote: Yes but the caveat may not exist if we do this when the streaming app is launched, since we're trying the start receivers before any other tasks. Discovered in this way will only include the worker hosts. By using the API we may need some extra efforts to single out worker hosts. Sometimes we may run master and worker daemons on the same host while some other times we don't, depending on configuration. Du On Friday, March 6, 2015 3:59 PM, Tathagata Das t...@databricks.com wrote: That can definitely be done. In fact I have done that. Just one caveat. If one of the executors is fully occupied with a previous very-long job, then these fake tasks may not capture that worker even if there are lots of tasks. The executor storage status will work for sure, as long as you can filter out the master. TD On Fri, Mar 6, 2015 at 3:49 PM, Du Li l...@yahoo-inc.com wrote: Hi TD, Thanks for your response and the information. I just tried out the SparkContext getExecutorMemoryStatus and getExecutorStorageStatus methods. Due to their purposes, they do not differentiate master and worker nodes. However, for performance of my app, I prefer to distribute receivers only to the worker nodes. This morning I worked out another solution: Create an accumulator and a fake workload, parallelize the workload with a high level of parallelism which does nothing but adds its hostname to the accumulator. Repeat this until the accumulator value stops growing. In the end I get the set of worker hostnames. It worked pretty well! Thanks,Du On Friday, March 6, 2015 3:11 PM, Tathagata Das t...@databricks.com wrote: What Saisai said is correct. There is no good API. However there are jacky ways of finding out the current workers. See sparkContext.getExecutorStorageStatus() and you can get the host names of the current executors. You could use those. TD On Thu, Mar 5, 2015 at 6:55 AM, Du Li l...@yahoo-inc.com wrote: | Hi TD. Do you have any suggestion? Thanks /Du Sent from Yahoo Mail for iPhone Begin Forwarded Message From: Shao, Saisai'saisai.s...@intel.com' Date: Mar 4, 2015, 10:35:44 PM To: Du Li'l...@yahoo-inc.com', User'user@spark.apache.org' Subject: RE: distribution of receivers in spark streaming Yes, hostname is enough. I think currently it is hard for user code to get the worker list from standalone master. If you can get the Master object, you could get the worker list, but AFAIK may be it is difficult to get this object. All you could do is to manually get the worker list and assigned its hostname to each receiver. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com] Sent: Thursday, March 5, 2015 2:29 PM To: Shao, Saisai; User Subject: Re: distribution of receivers in spark streaming Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks, Du On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai saisai.s...@intel.com wrote: Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three
distribution of receivers in spark streaming
Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks,Du
RDD coalesce or repartition by #records or #bytes?
Hi, My RDD's are created from kafka stream. After receiving a RDD, I want to do coalesce/repartition it so that the data will be processed in a set of machines in parallel as even as possible. The number of processing nodes is larger than the receiving nodes. My question is how the coalesce/repartition works. Does it distribute by the number of records or number of bytes? In my app, my observation is that the distribution seems by number of records. The consequence is, however, some executors have to process x1000 as much as data when the sizes of records are very skewed. Then we have to allocate memory by the worst case. Is there a way to programmatically affect the coalesce /repartition scheme? Thanks,Du
Re: distribution of receivers in spark streaming
Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks,Du
Re: distribution of receivers in spark streaming
Hi Jerry, Thanks for your response. Is there a way to get the list of currently registered/live workers? Even in order to provide preferredLocation, it would be safer to know which workers are active. Guess I only need to provide the hostname, right? Thanks,Du On Wednesday, March 4, 2015 10:08 PM, Shao, Saisai saisai.s...@intel.com wrote: #yiv8205255497 #yiv8205255497 -- _filtered #yiv8205255497 {font-family:Helvetica;panose-1:2 11 6 4 2 2 2 2 2 4;} _filtered #yiv8205255497 {font-family:SimSun;panose-1:2 1 6 0 3 1 1 1 1 1;} _filtered #yiv8205255497 {panose-1:2 4 5 3 5 4 6 3 2 4;} _filtered #yiv8205255497 {font-family:Calibri;panose-1:2 15 5 2 2 2 4 3 2 4;} _filtered #yiv8205255497 {panose-1:2 1 6 0 3 1 1 1 1 1;}#yiv8205255497 #yiv8205255497 p.yiv8205255497MsoNormal, #yiv8205255497 li.yiv8205255497MsoNormal, #yiv8205255497 div.yiv8205255497MsoNormal {margin:0in;margin-bottom:.0001pt;font-size:12.0pt;}#yiv8205255497 a:link, #yiv8205255497 span.yiv8205255497MsoHyperlink {color:#0563C1;text-decoration:underline;}#yiv8205255497 a:visited, #yiv8205255497 span.yiv8205255497MsoHyperlinkFollowed {color:#954F72;text-decoration:underline;}#yiv8205255497 span.yiv8205255497EmailStyle17 {color:#1F497D;}#yiv8205255497 .yiv8205255497MsoChpDefault {font-size:10.0pt;} _filtered #yiv8205255497 {margin:1.0in 1.0in 1.0in 1.0in;}#yiv8205255497 div.yiv8205255497WordSection1 {}#yiv8205255497 Hi Du, You could try to sleep for several seconds after creating streaming context to let all the executors registered, then all the receivers can distribute to the nodes more evenly. Also setting locality is another way as you mentioned. Thanks Jerry From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, March 5, 2015 1:50 PM To: User Subject: Re: distribution of receivers in spark streaming Figured it out: I need to override method preferredLocation() in MyReceiver class. On Wednesday, March 4, 2015 3:35 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I have a set of machines (say 5) and want to evenly launch a number (say 8) of kafka receivers on those machines. In my code I did something like the following, as suggested in the spark docs: val streams = (1 to numReceivers).map(_ = ssc.receiverStream(new MyKafkaReceiver())) ssc.union(streams) However, from the spark UI, I saw that some machines are not running any instance of the receiver while some get three. The mapping changed every time the system was restarted. This impacts the receiving and also the processing speeds. I wonder if it's possible to control/suggest the distribution so that it would be more even. How is the decision made in spark? Thanks, Du
Re: RDD saveAsObjectFile write to local file and HDFS
Add ³file://³ in front of your path. On 11/26/14, 10:15 AM, firemonk9 dhiraj.peech...@gmail.com wrote: When I am running spark locally, RDD saveAsObjectFile writes the file to local file system (ex : path /data/temp.txt) and when I am running spark on YARN cluster, RDD saveAsObjectFile writes the file to hdfs. (ex : path /data/temp.txt ) Is there a way to explictly mention local file system instead of hdfs when running on YARN cluster. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/RDD-saveAsObjectFile-w rite-to-local-file-and-HDFS-tp19898.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL performance
We have seen all kinds of results published that often contradict each other. My take is that the authors often know more tricks about how to tune their own/familiar products than the others. So the product on focus is tuned for ideal performance while the competitors are not. The authors are not necessarily biased but as a consequence the results are. Ideally it’s critical for the user community to be informed of all the in-depth tuning tricks of all products. However, realistically, there is a big gap in terms of documentation. Hope the Spark folks will make a difference. :-) Du From: Soumya Simanta soumya.sima...@gmail.commailto:soumya.sima...@gmail.com Date: Friday, October 31, 2014 at 4:04 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL performance I was really surprised to see the results here, esp. SparkSQL not completing http://www.citusdata.com/blog/86-making-postgresql-scale-hadoop-style I was under the impression that SparkSQL performs really well because it can optimize the RDD operations and load only the columns that are required. This essentially means in most cases SparkSQL should be as fast as Spark is. I would be very interested to hear what others in the group have to say about this. Thanks -Soumya
Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2
) at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360) at org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98) at org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager$$anon$1.run(SparkSQLOperationManager.scala:172) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:193) at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:175) at org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:150) at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:207) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1133) at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1118) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:58) at org.apache.hive.service.auth.TUGIContainingProcessor$1.run(TUGIContainingProcessor.java:55) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1637) at org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:526) at org.apache.hive.service.auth.TUGIContainingProcessor.process(TUGIContainingProcessor.java:55) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:657) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1637) at org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:621) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:712) at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:368) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1423) at org.apache.hadoop.ipc.Client.call(Client.java:1342) ... 71 more Caused by: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS] at org.apache.hadoop.security.SaslRpcClient.selectSaslClient(SaslRpcClient.java:171) at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:388) at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:702) at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:698) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1637) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:697) ... 74 more From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Tuesday, October 28, 2014 at 2:50 AM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2 Which version of Spark and Hadoop are you using? Could you please provide the full stack trace of the exception? On Tue, Oct 28, 2014 at 5:48 AM, Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid wrote: Hi, I was trying to set up Spark SQL on a private cluster. I configured a hive-site.xml under spark/conf that uses a local metestore with warehouse and default FS name set to HDFS on one of my corporate cluster. Then I started spark master, worker and thrift server. However, when creating a database on beeline, I got the following error: org.apache.hive.service.cli.HiveSQLException: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: java.io.IOException Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: “spark-master-host; destination host is: “HDFS-namenode:port
Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2
If I put all the jar files from my local hive in the front of the spark class path, a different error was reported, as follows: 14/10/28 18:29:40 ERROR transport.TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: PLAIN auth failed: null at org.apache.hadoop.security.SaslPlainServer.evaluateResponse(SaslPlainServer.java:108) at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:528) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:272) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:190) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) 14/10/28 18:29:40 ERROR server.TThreadPoolServer: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: PLAIN auth failed: null at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:190) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.thrift.transport.TTransportException: PLAIN auth failed: null at org.apache.thrift.transport.TSaslTransport.sendAndThrowMessage(TSaslTransport.java:221) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:305) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Tuesday, October 28, 2014 at 2:50 AM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2 Which version of Spark and Hadoop are you using? Could you please provide the full stack trace of the exception? On Tue, Oct 28, 2014 at 5:48 AM, Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid wrote: Hi, I was trying to set up Spark SQL on a private cluster. I configured a hive-site.xml under spark/conf that uses a local metestore with warehouse and default FS name set to HDFS on one of my corporate cluster. Then I started spark master, worker and thrift server. However, when creating a database on beeline, I got the following error: org.apache.hive.service.cli.HiveSQLException: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: java.io.IOException Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: “spark-master-host; destination host is: “HDFS-namenode:port; ) It occurred when spark was trying to create a hdfs directory under the warehouse in order to create the database. All processes (spark master, worker, thrift server, beeline) were run as a user with the right access permissions. My spark classpaths have /home/y/conf/hadoop in the front. I was able to read and write files from hadoop fs command line under the same directory and also from the spark-shell without any issue. Any hints regarding the right way of configuration would be appreciated. Thanks, Du
Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2
To clarify, this error was thrown from the thrift server when beeline was started to establish the connection, as follows: $ beeline -u jdbc:hive2://`hostname`:4080 –n username From: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID Date: Tuesday, October 28, 2014 at 11:35 AM To: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2 If I put all the jar files from my local hive in the front of the spark class path, a different error was reported, as follows: 14/10/28 18:29:40 ERROR transport.TSaslTransport: SASL negotiation failure javax.security.sasl.SaslException: PLAIN auth failed: null at org.apache.hadoop.security.SaslPlainServer.evaluateResponse(SaslPlainServer.java:108) at org.apache.thrift.transport.TSaslTransport$SaslParticipant.evaluateChallengeOrResponse(TSaslTransport.java:528) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:272) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:190) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) 14/10/28 18:29:40 ERROR server.TThreadPoolServer: Error occurred during processing of message. java.lang.RuntimeException: org.apache.thrift.transport.TTransportException: PLAIN auth failed: null at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:219) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:190) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:722) Caused by: org.apache.thrift.transport.TTransportException: PLAIN auth failed: null at org.apache.thrift.transport.TSaslTransport.sendAndThrowMessage(TSaslTransport.java:221) at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:305) at org.apache.thrift.transport.TSaslServerTransport.open(TSaslServerTransport.java:41) at org.apache.thrift.transport.TSaslServerTransport$Factory.getTransport(TSaslServerTransport.java:216) ... 4 more From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Tuesday, October 28, 2014 at 2:50 AM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [SPARK SQL] kerberos error when creating database from beeline/ThriftServer2 Which version of Spark and Hadoop are you using? Could you please provide the full stack trace of the exception? On Tue, Oct 28, 2014 at 5:48 AM, Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid wrote: Hi, I was trying to set up Spark SQL on a private cluster. I configured a hive-site.xml under spark/conf that uses a local metestore with warehouse and default FS name set to HDFS on one of my corporate cluster. Then I started spark master, worker and thrift server. However, when creating a database on beeline, I got the following error: org.apache.hive.service.cli.HiveSQLException: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: java.io.IOException Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: “spark-master-host; destination host is: “HDFS-namenode:port; ) It occurred when spark was trying to create a hdfs directory under the warehouse in order to create the database. All processes (spark master, worker, thrift server, beeline) were run as a user with the right access permissions. My spark classpaths have /home/y/conf/hadoop in the front. I was able to read and write files from hadoop fs command line under the same directory and also from the spark-shell without any issue. Any hints regarding the right way of configuration would be appreciated. Thanks, Du
[SPARK SQL] kerberos error when creating database from beeline/ThriftServer2
Hi, I was trying to set up Spark SQL on a private cluster. I configured a hive-site.xml under spark/conf that uses a local metestore with warehouse and default FS name set to HDFS on one of my corporate cluster. Then I started spark master, worker and thrift server. However, when creating a database on beeline, I got the following error: org.apache.hive.service.cli.HiveSQLException: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: java.io.IOException Failed on local exception: java.io.IOException: org.apache.hadoop.security.AccessControlException: Client cannot authenticate via:[TOKEN, KERBEROS]; Host Details : local host is: “spark-master-host; destination host is: “HDFS-namenode:port; ) It occurred when spark was trying to create a hdfs directory under the warehouse in order to create the database. All processes (spark master, worker, thrift server, beeline) were run as a user with the right access permissions. My spark classpaths have /home/y/conf/hadoop in the front. I was able to read and write files from hadoop fs command line under the same directory and also from the spark-shell without any issue. Any hints regarding the right way of configuration would be appreciated. Thanks, Du
Re: HiveContext: cache table not supported for partitioned table?
Thanks for your explanation. From: Cheng Lian lian.cs@gmail.commailto:lian.cs@gmail.com Date: Thursday, October 2, 2014 at 8:01 PM To: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: HiveContext: cache table not supported for partitioned table? Cache table works with partitioned table. I guess you’re experimenting with a default local metastore and the metastore_db directory doesn’t exist at the first place. In this case, all metastore tables/views don’t exist at first and will throw the error message you saw when the PARTITIONS metastore table is accessed for the first time by Hive client. However, you should also see this line before this error: 14/10/03 10:51:30 ERROR ObjectStore: Direct SQL failed, falling back to ORM And then the table is created on the fly. The cache operation is also performed normally. You can verify this by selecting it and check the Spark UI for cached RDDs. If you try to uncache the table and cache it again, you won’t see this error any more. Normally, in production environment you won’t see this error because metastore database is usually setup ahead of time. On 10/3/14 3:39 AM, Du Li wrote: Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
HiveContext: cache table not supported for partitioned table?
Hi, In Spark 1.1 HiveContext, I ran a create partitioned table command followed by a cache table command and got a java.sql.SQLSyntaxErrorException: Table/View 'PARTITIONS' does not exist. But cache table worked fine if the table is not a partitioned table. Can anybody confirm that cache of partitioned table is not supported yet in current version? Thanks, Du
Re: SparkSQL: map type MatchError when inserting into Hive table
It turned out a bug in my code. In the select clause the list of fields is misaligned with the schema of the target table. As a consequence the map data couldn’t be cast to some other type in the schema. Thanks anyway. On 9/26/14, 8:08 PM, Cheng Lian lian.cs@gmail.com wrote: Would you mind to provide the DDL of this partitioned table together with the query you tried? The stacktrace suggests that the query was trying to cast a map into something else, which is not supported in Spark SQL. And I doubt whether Hive support casting a complex type to some other type. On 9/27/14 7:48 AM, Du Li wrote: Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala :2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.sca la :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.ap pl y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.ap pl y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$ sq l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.s ca la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHi ve File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHi ve File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java :1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.jav a: 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
view not supported in spark thrift server?
Can anybody confirm whether or not view is currently supported in spark? I found “create view translate” in the blacklist of HiveCompatibilitySuite.scala and also the following scenario threw NullPointerException on beeline/thriftserver (1.1.0). Any plan to support it soon? create table src(k string, v string); load data local inpath '/home/y/share/yspark/examples/src/main/resources/kv1.txt' into table src; create view kv as select k, v from src; select * from kv; Error: java.lang.NullPointerException (state=,code=0)
Re: view not supported in spark thrift server?
Thanks, Michael, for your quick response. View is critical for my project that is migrating from shark to spark SQL. I have implemented and tested everything else. It would be perfect if view could be implemented soon. Du From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Sunday, September 28, 2014 at 12:13 PM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid Cc: d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: view not supported in spark thrift server? Views are not supported yet. Its not currently on the near term roadmap, but that can change if there is sufficient demand or someone in the community is interested in implementing them. I do not think it would be very hard. Michael On Sun, Sep 28, 2014 at 11:59 AM, Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid wrote: Can anybody confirm whether or not view is currently supported in spark? I found “create view translate” in the blacklist of HiveCompatibilitySuite.scala and also the following scenario threw NullPointerException on beeline/thriftserver (1.1.0). Any plan to support it soon? create table src(k string, v string); load data local inpath '/home/y/share/yspark/examples/src/main/resources/kv1.txt' into table src; create view kv as select k, v from src; select * from kv; Error: java.lang.NullPointerException (state=,code=0)
SparkSQL: map type MatchError when inserting into Hive table
Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL: map type MatchError when inserting into Hive table
It might be a problem when inserting into a partitioned table. It worked fine to when the target table was unpartitioned. Can you confirm this? Thanks, Du On 9/26/14, 4:48 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala: 2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scal a :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app l y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app l y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$s q l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sc a la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv e File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv e File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java : 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL use of alias in where clause
Thanks, Yanbo and Nicholas. Now it makes more sense — query optimization is the answer. /Du From: Nicholas Chammas nicholas.cham...@gmail.commailto:nicholas.cham...@gmail.com Date: Thursday, September 25, 2014 at 6:43 AM To: Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark SQL use of alias in where clause That is correct. Aliases in the SELECT clause can only be referenced in the ORDER BY and HAVING clauses. Otherwise, you'll have to just repeat the statement, like concat() in this case. A more elegant alternative, which is probably not available in Spark SQL yet, is to use Common Table Expressionshttp://technet.microsoft.com/en-us/library/ms190766(v=sql.105).aspx. On Wed, Sep 24, 2014 at 11:32 PM, Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com wrote: Maybe it's the way SQL works. The select part is executed after the where filter is applied, so you cannot use alias declared in select part in where clause. Hive and Oracle behavior the same as Spark SQL. 2014-09-25 8:58 GMT+08:00 Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid: Hi, The following query does not work in Shark nor in the new Spark SQLContext or HiveContext. SELECT key, value, concat(key, value) as combined from src where combined like ’11%’; The following tweak of syntax works fine although a bit ugly. SELECT key, value, concat(key, value) as combined from src where concat(key,value) like ’11%’ order by combined; Are you going to support alias in where clause soon? Thanks, Du
Spark SQL use of alias in where clause
Hi, The following query does not work in Shark nor in the new Spark SQLContext or HiveContext. SELECT key, value, concat(key, value) as combined from src where combined like ’11%’; The following tweak of syntax works fine although a bit ugly. SELECT key, value, concat(key, value) as combined from src where concat(key,value) like ’11%’ order by combined; Are you going to support alias in where clause soon? Thanks, Du
SQL status code to indicate success or failure of query
Hi, After executing sql() in SQLContext or HiveContext, is there a way to tell whether the query/command succeeded or failed? Method sql() returns SchemaRDD which either is empty or contains some Rows of results. However, some queries and commands do not return results by nature; being empty is not indicative of their status of execution. For example, if the command is to create/drop a table, how could I know if the table has really been created/dropped? If it fails, is there a status code or something I could check to tell the reasons for failure? Thanks, Du
Re: problem with HiveContext inside Actor
I have figured it out. As shown in the code below, if the HiveContext hc were created in the actor object and used to create db in response to message, it would throw null pointer exception. This is fixed by creating the HiveContext inside the MyActor class instead. I also tested the code by replacing Actor with Thread. The problem and fix are similar. Du —— abstract class MyMessage case object CreateDB extends MyMessage object MyActor { def init(_sc: SparkContext) = { if( actorSystem == null || actorRef == null ) { actorSystem = ActorSystem(“root) actorRef = actorSystem.actorOf(Props(new MyActor(_sc)), “myactor) } //hc = new MyHiveContext(_sc) } def !(m: MyMessage) { actorRef ! m } //var hc: MyHiveContext = _ private var actorSystem: ActorSystem = null private var actorRef: ActorRef = null } class MyActor(sc: SparkContext) extends Actor { val hc = new MyHiveContext(sc) def receive: Receiver = { case CreateDB = hc.createDB() } } class MyHiveContext(sc: SparkContext) extends HiveContext(sc) { def createDB() {...} } From: Chester @work ches...@alpinenow.commailto:ches...@alpinenow.com Date: Thursday, September 18, 2014 at 7:17 AM To: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID Cc: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor Akka actor are managed under a thread pool, so the same actor can be under different thread. If you create HiveContext in the actor, is it possible that you are essentially create different instance of HiveContext ? Sent from my iPhone On Sep 17, 2014, at 10:14 PM, Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID wrote: Thanks for your reply. Michael: No. I only create one HiveContext in the code. Hao: Yes. I subclass HiveContext and defines own function to create database and then subclass akka Actor to call that function in response to an abstract message. By your suggestion, I called println(sessionState.getConf.getAllProperties) that printed tons of properties; however, the same NullPointerException was still thrown. As mentioned, the weird thing is that everything worked fine if I simply called actor.hiveContext.createDB() directly. But it throws the null pointer exception from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the same thing because the actor does nothing but call createDB(). Du From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Wednesday, September 17, 2014 at 7:40 PM To: Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor - dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.orgmailto:user@spark.apache.org; d...@spark.apache.orgmailto:d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
problem with HiveContext inside Actor
Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Re: problem with HiveContext inside Actor
Thanks for your reply. Michael: No. I only create one HiveContext in the code. Hao: Yes. I subclass HiveContext and defines own function to create database and then subclass akka Actor to call that function in response to an abstract message. By your suggestion, I called println(sessionState.getConf.getAllProperties) that printed tons of properties; however, the same NullPointerException was still thrown. As mentioned, the weird thing is that everything worked fine if I simply called actor.hiveContext.createDB() directly. But it throws the null pointer exception from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the same thing because the actor does nothing but call createDB(). Du From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Wednesday, September 17, 2014 at 7:40 PM To: Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com Cc: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor - dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.commailto:hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.orgmailto:user@spark.apache.org; d...@spark.apache.orgmailto:d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not initialized. Ironically, it works fine if my main thread directly calls actor.hiveContext to create the database. The spark version is 1.1.0. Thanks, Du
Re: NullWritable not serializable
Hi, The test case is separated out as follows. The call to rdd2.first() breaks when spark version is changed to 1.1.0, reporting exception NullWritable not serializable. However, the same test passed with spark 1.0.2. The pom.xml file is attached. The test data README.md was copied from spark. Thanks, Du - package com.company.project.test import org.scalatest._ class WritableTestSuite extends FunSuite { test(generated sequence file should be readable from spark) { import org.apache.hadoop.io.{NullWritable, Text} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with spark) val sc = new SparkContext(conf) val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) assert(rdd.first == rdd2.first._2.toString) } } From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com Date: Monday, September 15, 2014 at 10:52 PM To: Du Li l...@yahoo-inc.commailto:l...@yahoo-inc.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Subject: Re: NullWritable not serializable Can you post the exact code for the test that worked in 1.0? I can't think of much that could've changed. The one possibility is if we had some operations that were computed locally on the driver (this happens with things like first() and take(), which will try to do the first partition locally). But generally speaking these operations should *not* work over a network, so you'll have to make sure that you only send serializable types through shuffles or collects, or use a serialization framework like Kryo that might be okay with Writables. Matei On September 15, 2014 at 9:13:13 PM, Du Li (l...@yahoo-inc.commailto:l...@yahoo-inc.com) wrote: Hi Matei, Thanks for your reply. The Writable classes have never been serializable and this is why it is weird. I did try as you suggested to map the Writables to integers and strings. It didn’t pass, either. Similar exceptions were thrown except that the messages became IntWritable, Text are not serializable. The reason is in the implicits defined in the SparkContext object that convert those values into their corresponding Writable classes before saving the data in sequence file. My original code was actual some test cases to try out SequenceFile related APIs. The tests all passed when the spark version was specified as 1.0.2. But this one failed after I changed the spark version to 1.1.0 the new release, nothing else changed. In addition, it failed when I called rdd2.collect(), take(1), and first(). But it worked fine when calling rdd2.count(). As you can see, count() does not need to serialize and ship data while the other three methods do. Do you recall any difference between spark 1.0 and 1.1 that might cause this problem? Thanks, Du From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com Date: Friday, September 12, 2014 at 9:10 PM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Subject: Re: NullWritable not serializable Hi Du, I don't think NullWritable has ever been serializable, so you must be doing something differently from your previous program. In this case though, just use a map() to turn your Writables to serializable types (e.g. null and String). Matie On September 12, 2014 at 8:48:36 PM, Du Li (l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid) wrote: Hi, I was trying the following on spark-shell (built with apache master and hadoop 2.4.0). Both calling rdd2.collect and calling rdd3.collect threw java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. I got the same problem in similar code of my app which uses the newly released Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 under either hadoop 2.40 and 0.23.10. Anybody knows what caused the problem? Thanks, Du import org.apache.hadoop.io.{NullWritable, Text} val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) rdd2.collect val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data) rdd3.collect pom.xml Description: pom.xml - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
Re: Does Spark always wait for stragglers to finish running?
There is a parameter spark.speculation that is turned off by default. Look at the configuration doc: http://spark.apache.org/docs/latest/configuration.html From: Pramod Biligiri pramodbilig...@gmail.commailto:pramodbilig...@gmail.com Date: Monday, September 15, 2014 at 3:30 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Does Spark always wait for stragglers to finish running? Hi, I'm running Spark tasks with speculation enabled. I'm noticing that Spark seems to wait in a given stage for all stragglers to finish, even though the speculated alternative might have finished sooner. Is that correct? Is there a way to indicate to Spark not to wait for stragglers to finish? Thanks, Pramod -- http://twitter.com/pramodbiligiri
Re: NullWritable not serializable
Hi Matei, Thanks for your reply. The Writable classes have never been serializable and this is why it is weird. I did try as you suggested to map the Writables to integers and strings. It didn’t pass, either. Similar exceptions were thrown except that the messages became IntWritable, Text are not serializable. The reason is in the implicits defined in the SparkContext object that convert those values into their corresponding Writable classes before saving the data in sequence file. My original code was actual some test cases to try out SequenceFile related APIs. The tests all passed when the spark version was specified as 1.0.2. But this one failed after I changed the spark version to 1.1.0 the new release, nothing else changed. In addition, it failed when I called rdd2.collect(), take(1), and first(). But it worked fine when calling rdd2.count(). As you can see, count() does not need to serialize and ship data while the other three methods do. Do you recall any difference between spark 1.0 and 1.1 that might cause this problem? Thanks, Du From: Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com Date: Friday, September 12, 2014 at 9:10 PM To: Du Li l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid, user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, d...@spark.apache.orgmailto:d...@spark.apache.org d...@spark.apache.orgmailto:d...@spark.apache.org Subject: Re: NullWritable not serializable Hi Du, I don't think NullWritable has ever been serializable, so you must be doing something differently from your previous program. In this case though, just use a map() to turn your Writables to serializable types (e.g. null and String). Matie On September 12, 2014 at 8:48:36 PM, Du Li (l...@yahoo-inc.com.invalidmailto:l...@yahoo-inc.com.invalid) wrote: Hi, I was trying the following on spark-shell (built with apache master and hadoop 2.4.0). Both calling rdd2.collect and calling rdd3.collect threw java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. I got the same problem in similar code of my app which uses the newly released Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 under either hadoop 2.40 and 0.23.10. Anybody knows what caused the problem? Thanks, Du import org.apache.hadoop.io.{NullWritable, Text} val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) rdd2.collect val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data) rdd3.collect
NullWritable not serializable
Hi, I was trying the following on spark-shell (built with apache master and hadoop 2.4.0). Both calling rdd2.collect and calling rdd3.collect threw java.io.NotSerializableException: org.apache.hadoop.io.NullWritable. I got the same problem in similar code of my app which uses the newly released Spark 1.1.0 under hadoop 2.4.0. Previously it worked fine with spark 1.0.2 under either hadoop 2.40 and 0.23.10. Anybody knows what caused the problem? Thanks, Du import org.apache.hadoop.io.{NullWritable, Text} val rdd = sc.textFile(README.md) val res = rdd.map(x = (NullWritable.get(), new Text(x))) res.saveAsSequenceFile(./test_data) val rdd2 = sc.sequenceFile(./test_data, classOf[NullWritable], classOf[Text]) rdd2.collect val rdd3 = sc.sequenceFile[NullWritable,Text](./test_data) rdd3.collect
SparkSQL HiveContext TypeTag compile error
Hi, I have the following code snippet. It works fine on spark-shell but in a standalone app it reports No TypeTag available for MySchema” at compile time when calling hc.createScheamaRdd(rdd). Anybody knows what might be missing? Thanks, Du -- Import org.apache.spark.sql.hive.HiveContext case class MySchema(key: Int, value: String) val rdd = sc.parallelize((1 to 10).map(i = MySchema(i, sval$i))) val schemaRDD = hc.createSchemaRDD(rdd) schemaRDD.registerTempTable(data) val rows = hc.sql(select * from data) rows.collect.foreach(println)
Re: SparkSQL HiveContext TypeTag compile error
Solved it. The problem occurred because the case class was defined within a test case in FunSuite. Moving the case class definition out of test fixed the problem. From: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID Date: Thursday, September 11, 2014 at 11:25 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL HiveContext TypeTag compile error Hi, I have the following code snippet. It works fine on spark-shell but in a standalone app it reports No TypeTag available for MySchema” at compile time when calling hc.createScheamaRdd(rdd). Anybody knows what might be missing? Thanks, Du -- Import org.apache.spark.sql.hive.HiveContext case class MySchema(key: Int, value: String) val rdd = sc.parallelize((1 to 10).map(i = MySchema(i, sval$i))) val schemaRDD = hc.createSchemaRDD(rdd) schemaRDD.registerTempTable(data) val rows = hc.sql(select * from data) rows.collect.foreach(println)
Re: spark sql - create new_table as select * from table
The implementation of SparkSQL is currently incomplete. You may try it out with HiveContext instead of SQLContext. On 9/11/14, 1:21 PM, jamborta jambo...@gmail.com wrote: Hi, I am trying to create a new table from a select query as follows: CREATE TABLE IF NOT EXISTS new_table ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE LOCATION '/user/test/new_table' AS select * from table this works in Hive, but in Spark SQL (1.0.2) I am getting Unsupported language features in query error. Could you suggest why I am getting this? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-create-new-t able-as-select-from-table-tp14006.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL HiveContext TypeTag compile error
Just moving it out of test is not enough. Must move the case class definition to the top level. Otherwise it would report a runtime error of task not serializable when executing collect(). From: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID Date: Thursday, September 11, 2014 at 12:33 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL HiveContext TypeTag compile error Solved it. The problem occurred because the case class was defined within a test case in FunSuite. Moving the case class definition out of test fixed the problem. From: Du Li l...@yahoo-inc.com.INVALIDmailto:l...@yahoo-inc.com.INVALID Date: Thursday, September 11, 2014 at 11:25 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: SparkSQL HiveContext TypeTag compile error Hi, I have the following code snippet. It works fine on spark-shell but in a standalone app it reports No TypeTag available for MySchema” at compile time when calling hc.createScheamaRdd(rdd). Anybody knows what might be missing? Thanks, Du -- Import org.apache.spark.sql.hive.HiveContext case class MySchema(key: Int, value: String) val rdd = sc.parallelize((1 to 10).map(i = MySchema(i, sval$i))) val schemaRDD = hc.createSchemaRDD(rdd) schemaRDD.registerTempTable(data) val rows = hc.sql(select * from data) rows.collect.foreach(println)
Re: Table not found: using jdbc console to query sparksql hive thriftserver
SchemaRDD has a method insertInto(table). When the table is partitioned, it would be more sensible and convenient to extend it with a list of partition key and values. From: Denny Lee denny.g@gmail.commailto:denny.g@gmail.com Date: Thursday, September 11, 2014 at 6:39 PM To: Du Li l...@yahoo-inc.commailto:l...@yahoo-inc.com Cc: u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org, alexandria1101 alexandria.shea...@gmail.commailto:alexandria.shea...@gmail.com Subject: Re: Table not found: using jdbc console to query sparksql hive thriftserver It sort of depends on the definition of efficiently. From a work flow perspective I would agree but from an I/O perspective, wouldn’t there be the same multi-pass from the standpoint of the Hive context needing to push the data into HDFS? Saying this, if you’re pushing the data into HDFS and then creating Hive tables via load (vs. a reference point ala external tables), I would agree with you. And thanks for correcting me, the registerTempTable is in the SqlContext. On September 10, 2014 at 13:47:24, Du Li (l...@yahoo-inc.commailto:l...@yahoo-inc.com) wrote: Hi Denny, There is a related question by the way. I have a program that reads in a stream of RDD¹s, each of which is to be loaded into a hive table as one partition. Currently I do this by first writing the RDD¹s to HDFS and then loading them to hive, which requires multiple passes of HDFS I/O and serialization/deserialization. I wonder if it is possible to do it more efficiently with Spark 1.1 streaming + SQL, e.g., by registering the RDDs into a hive context so that the data is loaded directly into the hive table in cache and meanwhile visible to jdbc/odbc clients. In the spark source code, the method registerTempTable you mentioned works on SqlContext instead of HiveContext. Thanks, Du On 9/10/14, 1:21 PM, Denny Lee denny.g@gmail.commailto:denny.g@gmail.com wrote: Actually, when registering the table, it is only available within the sc context you are running it in. For Spark 1.1, the method name is changed to RegisterAsTempTable to better reflect that. The Thrift server process runs under a different process meaning that it cannot see any of the tables generated within the sc context. You would need to save the sc table into Hive and then the Thrift process would be able to see them. HTH! On Sep 10, 2014, at 13:08, alexandria1101 alexandria.shea...@gmail.commailto:alexandria.shea...@gmail.com wrote: I used the hiveContext to register the tables and the tables are still not being found by the thrift server. Do I have to pass the hiveContext to JDBC somehow? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using -jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Hi Denny, There is a related question by the way. I have a program that reads in a stream of RDD¹s, each of which is to be loaded into a hive table as one partition. Currently I do this by first writing the RDD¹s to HDFS and then loading them to hive, which requires multiple passes of HDFS I/O and serialization/deserialization. I wonder if it is possible to do it more efficiently with Spark 1.1 streaming + SQL, e.g., by registering the RDDs into a hive context so that the data is loaded directly into the hive table in cache and meanwhile visible to jdbc/odbc clients. In the spark source code, the method registerTempTable you mentioned works on SqlContext instead of HiveContext. Thanks, Du On 9/10/14, 1:21 PM, Denny Lee denny.g@gmail.com wrote: Actually, when registering the table, it is only available within the sc context you are running it in. For Spark 1.1, the method name is changed to RegisterAsTempTable to better reflect that. The Thrift server process runs under a different process meaning that it cannot see any of the tables generated within the sc context. You would need to save the sc table into Hive and then the Thrift process would be able to see them. HTH! On Sep 10, 2014, at 13:08, alexandria1101 alexandria.shea...@gmail.com wrote: I used the hiveContext to register the tables and the tables are still not being found by the thrift server. Do I have to pass the hiveContext to JDBC somehow? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using -jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13922.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Table not found: using jdbc console to query sparksql hive thriftserver
Your tables were registered in the SqlContext, whereas the thrift server works with HiveContext. They seem to be in two different worlds today. On 9/9/14, 5:16 PM, alexandria1101 alexandria.shea...@gmail.com wrote: Hi, I want to use the sparksql thrift server in my application and make sure everything is loading and working. I built Spark 1.1 SNAPSHOT and ran the thrift server using ./sbin/start-thrift-server. In my application I load tables into schemaRDDs and I expect that the thrift-server should pick them up. In the app I then perform SQL queries on a table called mutation (the same name as the table I registered from the schemaRDD). I set the driver to org.apache.hive.jdbc.HiveDriver and the url to jdbc:hive2://localhost:1/mutation?zeroDateTimeBehavior=convertToNull . When I check the terminal for the thrift server output, it gets the query. However, I cannot use a jdbc console to communicate with it to show all of the databases and tables to see if mutation is loaded. I get the following errors: 14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn ector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java: 316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC onnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle .java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle .java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1( JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut ils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:5 3) at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144) at com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116 ) 1053 [main] WARN org.eclipse.jetty.util.component.AbstractLifeCycle - FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use java.net.BindException: Address already in use at sun.nio.ch.Net.bind0(Native Method) at sun.nio.ch.Net.bind(Net.java:444) at sun.nio.ch.Net.bind(Net.java:436) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) at org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn ector.java:187) at org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java: 316) at org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC onnector.java:265) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle .java:64) at org.eclipse.jetty.server.Server.doStart(Server.java:293) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle .java:64) at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1( JettyUtils.scala:192) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut ils.scala:1446) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442) at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202) at org.apache.spark.ui.WebUI.bind(WebUI.scala:102) at org.apache.spark.SparkContext.init(SparkContext.scala:224) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:5 3) at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144)
Re: Execute HiveFormSpark ERROR.
As suggested in the error messages, double-check your class path. From: CharlieLin chury...@gmail.commailto:chury...@gmail.com Date: Tuesday, August 26, 2014 at 8:29 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Execute HiveFormSpark ERROR. hi, all : I tried to use Spark SQL on spark-shell, as the spark-example. When I execute : val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) then report error like below: scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) 14/08/27 11:08:19 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 14/08/27 11:08:19 INFO ParseDriver: Parse Completed 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences 14/08/27 11:08:19 INFO Analyzer: Max iterations (2) reached for batch Check Analysis 14/08/27 11:08:19 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Add exchange 14/08/27 11:08:19 INFO SQLContext$$anon$1: Max iterations (2) reached for batch Prepare Expressions 14/08/27 11:08:19 INFO Driver: PERFLOG method=Driver.run 14/08/27 11:08:19 INFO Driver: PERFLOG method=TimeToSubmit 14/08/27 11:08:19 INFO Driver: PERFLOG method=compile 14/08/27 11:08:19 INFO Driver: PERFLOG method=parse 14/08/27 11:08:19 INFO ParseDriver: Parsing command: CREATE TABLE IF NOT EXISTS src (key INT, value STRING) 14/08/27 11:08:19 INFO ParseDriver: Parse Completed 14/08/27 11:08:19 INFO Driver: /PERFLOG method=parse start=1409108899822 end=1409108899822 duration=0 14/08/27 11:08:19 INFO Driver: PERFLOG method=semanticAnalyze 14/08/27 11:08:19 INFO SemanticAnalyzer: Starting Semantic Analysis 14/08/27 11:08:19 INFO SemanticAnalyzer: Creating table src position=27 14/08/27 11:08:19 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/08/27 11:08:19 INFO ObjectStore: ObjectStore, initialize called 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark/lib_managed/jars/datanucleus-core-3.2.2.jarfile:/home/spark/spark/lib_managed/jars/datanucleus-core-3.2.2.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-core-3.2.2.jar.file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-core-3.2.2.jar. 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus.store.rdbms is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-rdbms-3.2.1.jarfile:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-rdbms-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark/lib_managed/jars/datanucleus-rdbms-3.2.1.jar.file:/home/spark/spark/lib_managed/jars/datanucleus-rdbms-3.2.1.jar. 14/08/27 11:08:20 WARN General: Plugin (Bundle) org.datanucleus.api.jdo is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL file:/home/spark/spark/lib_managed/jars/datanucleus-api-jdo-3.2.1.jarfile:/home/spark/spark/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar is already registered, and you are trying to register an identical plugin located at URL file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar.file:/home/spark/spark-1.0.2-2.0.0-mr1-cdh-4.2.1/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar. 14/08/27 11:08:20 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table src at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:958) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:905) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:8999) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8313) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:284) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:441) at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:342) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:977) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:888) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:189) at
SparkSQL returns ArrayBuffer for fields of type Array
Hi, Michael. I used HiveContext to create a table with a field of type Array. However, in the hql results, this field was returned as type ArrayBuffer which is mutable. Would it make more sense to be an Array? The Spark version of my test is 1.0.2. I haven’t tested it on SQLContext nor newer version of Spark yet. Thanks, Du
Re: SparkSQL returns ArrayBuffer for fields of type Array
I found this discrepancy when writing unit tests for my project. Basically the expectation was that the returned type should match that of the input data. Although it’s easy to work around, I was just feeling a bit weird. Is there a better reason to return ArrayBuffer? From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Date: Wednesday, August 27, 2014 at 5:21 PM To: Du Li l...@yahoo-inc.commailto:l...@yahoo-inc.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: SparkSQL returns ArrayBuffer for fields of type Array Arrays in the JVM are also mutable. However, you should not be relying on the exact type here. The only promise is that you will get back something of type Seq[_]. On Wed, Aug 27, 2014 at 4:27 PM, Du Li l...@yahoo-inc.commailto:l...@yahoo-inc.com wrote: Hi, Michael. I used HiveContext to create a table with a field of type Array. However, in the hql results, this field was returned as type ArrayBuffer which is mutable. Would it make more sense to be an Array? The Spark version of my test is 1.0.2. I haven’t tested it on SQLContext nor newer version of Spark yet. Thanks, Du
unable to instantiate HiveMetaStoreClient on LocalHiveContext
Hi, I created an instance of LocalHiveContext and attempted to create a database. However, it failed with message org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient”. My code is as follows. Similar code worked on spark-shell and also bin/run-example org.apache.spark.examples.sql.hive.HiveFromSpark. import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.sql.hive.LocalHiveContext val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val hc = new LocalHiveContext(sc) hc.hql(“create database if not exists testdb) The exception was thrown out of the hql call. Did I miss any configuration? Thanks, Du
Re: Hive From Spark
I thought the fix had been pushed to the apache master ref. commit [SPARK-2848] Shade Guava in uber-jars By Marcelo Vanzin on 8/20. So my previous email was based on own build of the apache master, which turned out not working yet. Marcelo: Please correct me if I got that commit wrong. Thanks, Du On 8/22/14, 11:41 AM, Marcelo Vanzin van...@cloudera.com wrote: SPARK-2420 is fixed. I don't think it will be in 1.1, though - might be too risky at this point. I'm not familiar with spark-sql. On Fri, Aug 22, 2014 at 11:25 AM, Andrew Lee alee...@hotmail.com wrote: Hopefully there could be some progress on SPARK-2420. It looks like shading may be the voted solution among downgrading. Any idea when this will happen? Could it happen in Spark 1.1.1 or Spark 1.1.2? By the way, regarding bin/spark-sql? Is this more of a debugging tool for Spark job integrating with Hive? How does people use spark-sql? I'm trying to understand the rationale and motivation behind this script, any idea? Date: Thu, 21 Aug 2014 16:31:08 -0700 Subject: Re: Hive From Spark From: van...@cloudera.com To: l...@yahoo-inc.com.invalid CC: user@spark.apache.org; u...@spark.incubator.apache.org; pwend...@gmail.com Hi Du, I don't believe the Guava change has made it to the 1.1 branch. The Guava doc says hashInt was added in 12.0, so what's probably happening is that you have and old version of Guava in your classpath before the Spark jars. (Hadoop ships with Guava 11, so that may be the source of your problem.) On Thu, Aug 21, 2014 at 4:23 PM, Du Li l...@yahoo-inc.com.invalid wrote: Hi, This guava dependency conflict problem should have been fixed as of yesterday according to https://issues.apache.org/jira/browse/SPARK-2420 However, I just got java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/Ha shCode; by the following code snippet and ³mvn3 test² on Mac. I built the latest version of spark (1.1.0-SNAPSHOT) and installed the jar files to the local maven repo. From my pom file I explicitly excluded guava from almost all possible dependencies, such as spark-hive_2.10-1.1.0.SNAPSHOT, and hadoop-client. This snippet is abstracted from a larger project. So the pom.xml includes many dependencies although not all are required by this snippet. The pom.xml is attached. Anybody knows what to fix it? Thanks, Du --- package com.myself.test import org.scalatest._ import org.apache.hadoop.io.{NullWritable, BytesWritable} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ class MyRecord(name: String) extends Serializable { def getWritable(): BytesWritable = { new BytesWritable(Option(name).getOrElse(\\N).toString.getBytes(UTF-8)) } final override def equals(that: Any): Boolean = { if( !that.isInstanceOf[MyRecord] ) false else { val other = that.asInstanceOf[MyRecord] this.getWritable == other.getWritable } } } class MyRecordTestSuite extends FunSuite { // construct an MyRecord by Consumer.schema val rec: MyRecord = new MyRecord(James Bond) test(generated SequenceFile should be readable from spark) { val path = ./testdata/ val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val rdd = sc.makeRDD(Seq(rec)) rdd.map((x: MyRecord) = (NullWritable.get(), x.getWritable())) .saveAsSequenceFile(path) val bytes = sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable]).first._2 assert(rec.getWritable() == bytes) sc.stop() System.clearProperty(spark.driver.port) } } From: Andrew Lee alee...@hotmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Monday, July 21, 2014 at 10:27 AM To: user@spark.apache.org user@spark.apache.org, u...@spark.incubator.apache.org u...@spark.incubator.apache.org Subject: RE: Hive From Spark Hi All, Currently, if you are running Spark HiveContext API with Hive 0.12, it won't work due to the following 2 libraries which are not consistent with Hive 0.12 and Hadoop as well. (Hive libs aligns with Hadoop libs, and as a common practice, they should be consistent to work inter-operable). These are under discussion in the 2 JIRA tickets: https://issues.apache.org/jira/browse/HIVE-7387 https://issues.apache.org/jira/browse/SPARK-2420 When I ran the command by tweaking the classpath and build for Spark 1.0.1-rc3, I was able to create table through HiveContext, however, when I fetch the data, due to incompatible API calls in Guava, it breaks. This is critical since it needs to map the cllumns to the RDD schema. Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+. The community isn't willing to downgrade to 11.0.1 which
Re: Hive From Spark
Hi, This guava dependency conflict problem should have been fixed as of yesterday according to https://issues.apache.org/jira/browse/SPARK-2420 However, I just got java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; by the following code snippet and “mvn3 test” on Mac. I built the latest version of spark (1.1.0-SNAPSHOT) and installed the jar files to the local maven repo. From my pom file I explicitly excluded guava from almost all possible dependencies, such as spark-hive_2.10-1.1.0.SNAPSHOT, and hadoop-client. This snippet is abstracted from a larger project. So the pom.xml includes many dependencies although not all are required by this snippet. The pom.xml is attached. Anybody knows what to fix it? Thanks, Du --- package com.myself.test import org.scalatest._ import org.apache.hadoop.io.{NullWritable, BytesWritable} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ class MyRecord(name: String) extends Serializable { def getWritable(): BytesWritable = { new BytesWritable(Option(name).getOrElse(\\N).toString.getBytes(UTF-8)) } final override def equals(that: Any): Boolean = { if( !that.isInstanceOf[MyRecord] ) false else { val other = that.asInstanceOf[MyRecord] this.getWritable == other.getWritable } } } class MyRecordTestSuite extends FunSuite { // construct an MyRecord by Consumer.schema val rec: MyRecord = new MyRecord(James Bond) test(generated SequenceFile should be readable from spark) { val path = ./testdata/ val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val rdd = sc.makeRDD(Seq(rec)) rdd.map((x: MyRecord) = (NullWritable.get(), x.getWritable())) .saveAsSequenceFile(path) val bytes = sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable]).first._2 assert(rec.getWritable() == bytes) sc.stop() System.clearProperty(spark.driver.port) } } From: Andrew Lee alee...@hotmail.commailto:alee...@hotmail.com Reply-To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Date: Monday, July 21, 2014 at 10:27 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org, u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org u...@spark.incubator.apache.orgmailto:u...@spark.incubator.apache.org Subject: RE: Hive From Spark Hi All, Currently, if you are running Spark HiveContext API with Hive 0.12, it won't work due to the following 2 libraries which are not consistent with Hive 0.12 and Hadoop as well. (Hive libs aligns with Hadoop libs, and as a common practice, they should be consistent to work inter-operable). These are under discussion in the 2 JIRA tickets: https://issues.apache.org/jira/browse/HIVE-7387 https://issues.apache.org/jira/browse/SPARK-2420 When I ran the command by tweaking the classpath and build for Spark 1.0.1-rc3, I was able to create table through HiveContext, however, when I fetch the data, due to incompatible API calls in Guava, it breaks. This is critical since it needs to map the cllumns to the RDD schema. Hive and Hadoop are using an older version of guava libraries (11.0.1) where Spark Hive is using guava 14.0.1+. The community isn't willing to downgrade to 11.0.1 which is the current version for Hadoop 2.2 and Hive 0.12. Be aware of protobuf version as well in Hive 0.12 (it uses protobuf 2.4). scala scala import org.apache.spark.SparkContext import org.apache.spark.SparkContext scala import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._ scala scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@34bee01a scala scala hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src) res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala scala // Queries are expressed in HiveQL scala hiveContext.hql(FROM src SELECT key, value).collect().foreach(println) java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/HashCode; at org.apache.spark.util.collection.OpenHashSet.org$apache$spark$util$collection$OpenHashSet$$hashcode(OpenHashSet.scala:261) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.getPos$mcI$sp(OpenHashSet.scala:165) at org.apache.spark.util.collection.OpenHashSet$mcI$sp.contains$mcI$sp(OpenHashSet.scala:102)