Re: flume spark streaming receiver host random
I think you may be missing a key word here. Are you saying that the machine has multiple interfaces and it is not using the one you expect or the receiver is not running on the machine you expect? On Sep 26, 2014 3:33 AM, centerqi hu cente...@gmail.com wrote: Hi all My code is as follows: /usr/local/webserver/sparkhive/bin/spark-submit --class org.apache.spark.examples.streaming.FlumeEventCount --master yarn --deploy-mode cluster --queue online --num-executors 5 --driver-memory 6g --executor-memory 20g --executor-cores 5 target/scala-2.10/simple-project_2.10-1.0.jar 10.1.15.115 6 However, the receiver does not in the 10.1.15.115, but the random choice of one slave host. How to solve this problem? Thanks -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + Actors
Hi Team, Could you please respond on my below request. Regards, Rajesh On Thu, Sep 25, 2014 at 11:38 PM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, Can I use Actors in Spark Streaming based on events type? Could you please review below Test program and let me know if any thing I need to change with respect to best practices import akka.actor.Actor import akka.actor.{ActorRef, Props} import org.apache.spark.SparkConf import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.Seconds import akka.actor.ActorSystem case class one(r: org.apache.spark.rdd.RDD[String]) case class two(s: org.apache.spark.rdd.RDD[String]) class Events extends Actor { def receive = { // Based on event type - Invoke respective methods asynchronously case one(r) = println(ONE COUNT + r.count) // Invoke respective functions case two(s) = println(TWO COUNT + s.count) // Invoke respective functions } } object Test { def main(args: Array[String]) { val system = ActorSystem(System) val event: ActorRef = system.actorOf(Props[Events], events) val sparkConf = new SparkConf() setAppName(AlertsLinesCount) setMaster(local) val ssc = new StreamingContext(sparkConf, Seconds(30)) val lines = ssc textFileStream(hdfs://localhost:9000/user/rajesh/EventsDirectory/) lines foreachRDD(x = { event ! one(x) event ! two(x) }) ssc.start ssc.awaitTermination } } Regards, Rajesh
Access by name in tuples in Scala with Spark
Could you advise the best practice of using some named tuples in Scala with Spark RDD. Currently we can access by a field number in a tuple: RDD.map{_.2} But want to see such construction: RDD.map{_.itemId} This one will be helpful for debugging purposes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Access-by-name-in-tuples-in-Scala-with-Spark-tp15212.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Access by name in tuples in Scala with Spark
I think you are simply looking for a case class in Scala. It is a simple way to define an object with named, typed fields. On Fri, Sep 26, 2014 at 8:31 AM, rzykov rzy...@gmail.com wrote: Could you advise the best practice of using some named tuples in Scala with Spark RDD. Currently we can access by a field number in a tuple: RDD.map{_.2} But want to see such construction: RDD.map{_.itemId} This one will be helpful for debugging purposes. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Access-by-name-in-tuples-in-Scala-with-Spark-tp15212.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction?
Hi, I'm querying a big table using Spark SQL. I see very long GC time in some stages. I wonder if I can improve it by tuning the storage parameter. The question is: the schemaRDD has been cached with cacheTable() function. So is the cached schemaRDD part of memory storage controlled by the spark.storage.memoryFraction parameter? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Log hdfs blocks sending
Hello Andrew! Thanks for reply. Which logs and on what level should I check? Driver, master or worker? I found this on master node, but there is only ANY locality requirement. Here it is the driver (spark sql) log - https://gist.github.com/13h3r/c91034307caa33139001 and one of the workers log - https://gist.github.com/13h3r/6e5053cf0dbe33f2 Do you have any idea where to look at? Thanks! On Fri, Sep 26, 2014 at 10:35 AM, Andrew Ash and...@andrewash.com wrote: Hi Alexey, You should see in the logs a locality measure like NODE_LOCAL, PROCESS_LOCAL, ANY, etc. If your Spark workers each have an HDFS data node on them and you're reading out of HDFS, then you should be seeing almost all NODE_LOCAL accesses. One cause I've seen for mismatches is if Spark uses short hostnames and Hadoop uses FQDNs -- in that case Spark doesn't think the data is local and does remote reads which really kills performance. Hope that helps! Andrew On Thu, Sep 25, 2014 at 12:09 AM, Alexey Romanchuk alexey.romanc...@gmail.com wrote: Hello again spark users and developers! I have standalone spark cluster (1.1.0) and spark sql running on it. My cluster consists of 4 datanodes and replication factor of files is 3. I use thrift server to access spark sql and have 1 table with 30+ partitions. When I run query on whole table (something simple like select count(*) from t) spark produces a lot of network activity filling all available 1gb link. Looks like spark sent data by network instead of local reading. Is it any way to log which blocks were accessed locally and which are not? Thanks!
Re: Job cancelled because SparkContext was shut down
Just wanted to answer my question in case someone else runs into the same problem. It is related to the problem discussed here: http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html and here: https://issues.apache.org/jira/browse/SPARK-2121 seems yarn kills some of the executors as they request more memory than expected. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-cancelled-because-SparkContext-was-shut-down-tp15189p15216.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with Spark-1.1.0 and the start-thriftserver.sh script
Hi Helene, Thanks for the report. In Spark 1.1, we use a special exit code to indicate |SparkSubmit| fails because of class not found. But unfortunately I chose a not so special exit code — 1… So whenever the process exit with 1 as exit code, the |-Phive| error message is shown. A PR that changes 1 to 101 has been merged to master, hopefully to reduce potential exit code conflicts. Cheng On 9/25/14 5:44 PM, Hélène Delanoeye wrote: Hi We've just experienced an issue with the new Spark-1.1.0 and the start-thriftserver.sh script. We tried to launch start-thriftserver.sh with --master yarn option and got the following error message : /Failed to load Hive Thrift server main class org.apache.spark.sql.hive.thriftserver.HiveThriftServer2. You need to build Spark with -Phive. / In fact Spark was built with -Phive option, but the real problem was this one : /Application appattempt_1411058337040_0118_01 submitted by user x to unknown queue: default/ So the solution was to specified the queue, and it works : /opt/spark/sbin/start-thriftserver.sh --master yarn --queue spark-batch Hope this could help, as the error message is not really clear (and rather wrong). Helene -- Kelkoo *Hélène Delanoeye *Software engineer / Search team *E*helene.delano...@kelkoo.com mailto:helene.delano...@kelkoo.com *Y!Messenger* kelkoohelened *T* (+33) 4 56 09 07 57 *A* 6, rue des Méridiens 38130 Echirolles FRANCE Kelkoo SAS Société par Actions Simplifiée Au capital de € 4.168.964,30 Siège social : 8, rue du Sentier 75002 Paris 425 093 069 RCS Paris Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire et d'en avertir l'expéditeur.
Re: Using one sql query's result inside another sql query
H Twinkle, The failure is caused by case sensitivity. The temp table actually stores the original un-analyzed logical plan, thus field names remain capital (F1, F2, etc.). I believe this issue has already been fixed by PR #2382 https://github.com/apache/spark/pull/2382. As a workaround, you can use lowercase letters in field names instead. Cheng On 9/25/14 1:18 PM, twinkle sachdeva wrote: Hi, I am using Hive Context to fire the sql queries inside spark. I have created a schemaRDD( Let's call it cachedSchema ) inside my code. If i fire a sql query ( Query 1 ) on top of it, then it works. But if I refer to Query1's result inside another sql, that fails. Note that I have already registered Query1's result as temp table. registerTempTable(cachedSchema) Queryresult1 = Query1 using cachedSchema [ works ] registerTempTable(Queryresult1) Queryresult2 = Query2 using Queryresult1 [ FAILS ] Is it expected?? Any known work around? Following is the exception I am receiving : *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'f1,'f2,'f3,'f4, tree:* *Project ['f1,'f2,'f3,'f4]* * Filter ('count 3)* * LowerCaseSchema * * Subquery x* *Project ['F1,'F2,'F3,'F4,'F6,'Count]* * LowerCaseSchema * * Subquery src* * SparkLogicalPlan (ExistingRdd [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at SQLBlock.scala:64)* *at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)* *at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)* *at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)* *at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)* *at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)* *at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)* *at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)* *at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)* *at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* *at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)* *at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)* *at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)* *at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)* *at scala.collection.immutable.List.foreach(List.scala:318)* *at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)* *at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)* *at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)* *at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)* *at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)* *at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)* *at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)* *at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)* *at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)* *at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)* *at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)* *at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)* *at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)* *at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*
executorAdded event to DAGScheduler
Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case. (But DAGScheduler executorAdded is notified only for new host - so only once in this case). If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Re: executorAdded event to DAGScheduler
Some corrections. On Fri, Sep 26, 2014 at 5:32 PM, praveen seluka praveen.sel...@gmail.com wrote: Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case the DAGscheduler is notified only once. If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Re: SparkSQL Thriftserver in Mesos
You can avoid install Spark on each node by uploading Spark distribution tarball file to HDFS setting |spark.executor.uri| to the HDFS location. In this way, Mesos will download and the tarball file before launching containers. Please refer to this Spark documentation page http://spark.apache.org/docs/latest/running-on-mesos.html for details. However, using |spark.executor.uri| together with fine-grained mode (which is the default mode) really kills performance, because Mesos downloads and extracts the tarball every time a Spark /task/ (not application) is launched. On 9/21/14 1:16 AM, John Omernik wrote: I am running the Thrift server in SparkSQL, and running it on the node I compiled spark on. When I run it, tasks only work if they landed on that node, other executors started on nodes I didn't compile spark on (and thus don't have the compile directory) fail. Should spark be distributed properly with the executor uri in my spark-defaults for mesos? Here is the error on nodes with Lost executors sh: 1: /opt/mapr/spark/spark-1.1.0-SNAPSHOT/sbin/spark-executor: not found
Re: executorAdded event to DAGScheduler
just a quick reply, we cannot start two executors in the same host for a single application in the standard deployment (one worker per machine) I’m not sure if it will create an issue when you have multiple workers in the same host, as submitWaitingStages is called everywhere and I never try such a deployment mode Best, -- Nan Zhu On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote: Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? DAGScheduler does submitWaitingStages when executorAdded method is called by TaskSchedulerImpl. I see some issue in the below code, TaskSchedulerImpl.scala code if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case. (But DAGScheduler executorAdded is notified only for new host - so only once in this case). If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Re: executorAdded event to DAGScheduler
In Yarn, we can easily have multiple containers allocated in the same node. On Fri, Sep 26, 2014 at 6:05 PM, Nan Zhu zhunanmcg...@gmail.com wrote: just a quick reply, we cannot start two executors in the same host for a single application in the standard deployment (one worker per machine) I’m not sure if it will create an issue when you have multiple workers in the same host, as submitWaitingStages is called everywhere and I never try such a deployment mode Best, -- Nan Zhu On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote: Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case. (But DAGScheduler executorAdded is notified only for new host - so only once in this case). If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
How to run hive scripts pro-grammatically in Spark 1.1.0 ?
I am unable to run hive scripts in Spark 1.1.0 pro-grammatically in hadoop prompt but I could do it manually. Can anyone help me to run hive scripts pro-grammatically in spark1.1.0 cluster on EMR? Manual running steps:- hadoop@ip-10-151-71-224:~/tmpSpark/spark1.1/spark$ ./bin/spark-shell --driver-memory 4G --executor-memory 4G Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/26 11:54:29 INFO SecurityManager: Changing view acls to: hadoop, 14/09/26 11:54:29 INFO SecurityManager: Changing modify acls to: hadoop, 14/09/26 11:54:29 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop, ); users with modify permissions: Set(hadoop, ) 14/09/26 11:54:29 INFO HttpServer: Starting HTTP Server 14/09/26 11:54:29 INFO Utils: Successfully started service 'HTTP class server' on port 52081. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_40) Type in expressions to have them evaluated. Type :help for more information. 14/09/26 11:54:34 INFO SecurityManager: Changing view acls to: hadoop, 14/09/26 11:54:34 INFO SecurityManager: Changing modify acls to: hadoop, 14/09/26 11:54:34 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop, ); users with modify permissions: Set(hadoop, ) 14/09/26 11:54:35 INFO Slf4jLogger: Slf4jLogger started 14/09/26 11:54:35 INFO Remoting: Starting remoting 14/09/26 11:54:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137] 14/09/26 11:54:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137] 14/09/26 11:54:35 INFO Utils: Successfully started service 'sparkDriver' on port 46137. 14/09/26 11:54:35 INFO SparkEnv: Registering MapOutputTracker 14/09/26 11:54:35 INFO SparkEnv: Registering BlockManagerMaster 14/09/26 11:54:35 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140926115435-fa1a 14/09/26 11:54:35 INFO Utils: Successfully started service 'Connection manager for block manager' on port 47623. 14/09/26 11:54:35 INFO ConnectionManager: Bound socket to port 47623 with id = ConnectionManagerId(ip-10-151-71-224.ec2.internal,47623) 14/09/26 11:54:35 INFO MemoryStore: MemoryStore started with capacity 2.1 GB 14/09/26 11:54:35 INFO BlockManagerMaster: Trying to register BlockManager 14/09/26 11:54:35 INFO BlockManagerMasterActor: Registering block manager ip-10-151-71-224.ec2.internal:47623 with 2.1 GB RAM 14/09/26 11:54:35 INFO BlockManagerMaster: Registered BlockManager 14/09/26 11:54:35 INFO HttpFileServer: HTTP File server directory is /tmp/spark-dc2260ea-18cc-4204-8f02-36bcc1df1126 14/09/26 11:54:35 INFO HttpServer: Starting HTTP Server 14/09/26 11:54:36 INFO Utils: Successfully started service 'HTTP file server' on port 49299. 14/09/26 11:54:41 INFO Utils: Successfully started service 'SparkUI' on port 4040. 14/09/26 11:54:41 INFO SparkUI: Started SparkUI at http://ip-10-151-71-224.ec2.internal:4040 14/09/26 11:54:41 INFO Executor: Using REPL class URI: http://10.151.71.224:52081 14/09/26 11:54:41 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137/user/HeartbeatReceiver 14/09/26 11:54:41 INFO SparkILoop: Created spark context.. Spark context available as sc. scala scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc); hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@3e77175c scala hiveContext.hql(CREATE EXTERNAL TABLE IF NOT EXISTS test (time string, id string) ROW FORMAT DELIMITED STORED AS TEXTFILE LOCATION 's3n://output/test/'); -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-hive-scripts-pro-grammatically-in-Spark-1-1-0-tp15225.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with HiveContext inside Actor
This is reasonable, since the actual constructor gets called is |Driver()| rather than |Driver(HiveConf)|. The former initializes the |conf| field by: |conf = SessionState.get().getConf() | And |SessionState.get()| reads a TSS value. Thus executing SQL queries within another thread causes NPE since the |Driver| is created in a thread different from the one |HiveContext| (and the contained |SessionState|) gets constructed. On 9/19/14 3:31 AM, Du Li wrote: I have figured it out. As shown in the code below, if the HiveContext hc were created in the actor object and used to create db in response to message, it would throw null pointer exception. This is fixed by creating the HiveContext inside the MyActor class instead. I also tested the code by replacing Actor with Thread. The problem and fix are similar. Du —— abstract class MyMessage case object CreateDB extends MyMessage object MyActor { def init(_sc: SparkContext) = { if( actorSystem == null || actorRef == null ) { actorSystem = ActorSystem(“root) actorRef = actorSystem.actorOf(Props(new MyActor(_sc)), “myactor) } //hc = new MyHiveContext(_sc) } def !(m: MyMessage) { actorRef ! m } //var hc: MyHiveContext = _ private var actorSystem: ActorSystem = null private var actorRef: ActorRef = null } class MyActor(sc: SparkContext) extends Actor { val hc = new MyHiveContext(sc) def receive: Receiver = { case CreateDB = hc.createDB() } } class MyHiveContext(sc: SparkContext) extends HiveContext(sc) { def createDB() {...} } From: Chester @work ches...@alpinenow.com mailto:ches...@alpinenow.com Date: Thursday, September 18, 2014 at 7:17 AM To: Du Li l...@yahoo-inc.com.INVALID mailto:l...@yahoo-inc.com.INVALID Cc: Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com, Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com, user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor Akka actor are managed under a thread pool, so the same actor can be under different thread. If you create HiveContext in the actor, is it possible that you are essentially create different instance of HiveContext ? Sent from my iPhone On Sep 17, 2014, at 10:14 PM, Du Li l...@yahoo-inc.com.INVALID mailto:l...@yahoo-inc.com.INVALID wrote: Thanks for your reply. Michael: No. I only create one HiveContext in the code. Hao: Yes. I subclass HiveContext and defines own function to create database and then subclass akka Actor to call that function in response to an abstract message. By your suggestion, I called println(sessionState.getConf.getAllProperties) that printed tons of properties; however, the same NullPointerException was still thrown. As mentioned, the weird thing is that everything worked fine if I simply called actor.hiveContext.createDB() directly. But it throws the null pointer exception from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the same thing because the actor does nothing but call createDB(). Du From: Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com Date: Wednesday, September 17, 2014 at 7:40 PM To: Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com Cc: Du Li l...@yahoo-inc.com.invalid mailto:l...@yahoo-inc.com.invalid, user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor - dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.org mailto:user@spark.apache.org; d...@spark.apache.org mailto:d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not
Re: Systematic error when re-starting Spark stream unless I delete all checkpoints
Hi all, I apologise for re-posting this, I realise some mail systems are filtering all the code samples from the original post. I would greatly appreciate any pointer regarding, this issue basically renders spark streaming not fault-tolerant for us. Thanks in advance, S --- I experience spark streaming restart issues similar to what is discussed in the 2 threads below (in which I failed to find a solution). Could anybody let me know if anything is wrong in the way I start/stop or if this could be a spark bug? http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html My stream reads a Kafka topic, does some processing involving an updatStateByKey and saves the result to HDFS. The context is (re)-created at startup as follows: def streamContext() = { def newContext() = { val ctx = new StreamingContext(sparkConf, Duration(1)) ctx.checkpoint(hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/) ctx } StreamingContext.getOrCreate(hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/, newContext) } And the start-up and shutdown of the stream is handled as follows: try { val sparkContext = streamContext() [.. build stream here...] sparkContext.start() sparkContext.awaitTermination() } catch { case e: Throwable = log.error(shutting down tabulation stream..., e) sparkContext.stop() log.info(...waiting termination...) sparkContext.awaitTermination() log.info(...tabulation stream stopped) } When starting the stream for the first time (with spark-submit), the processing happens successfully, folders are created on the target HDFS folder and streaming stats are visible on http://sparkhost:4040/streaming. After letting the streaming work several minutes and then stopping it (ctrl-c on the command line), the following info is visible in the checkpoint folder: mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 11 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229 -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230 -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232 -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233.bk mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438 drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542 (checkpoint clean-up seems to happen since the stream ran for much more than 5 times 10 seconds) When re-starting the stream, the startup fails with the error below, http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is added in the target folder and no new checkpoint are created: 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down tabulation stream... org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been initialized at
java.io.IOException Error in task deserialization
Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: Access file name in map function
If the size of each file is small, you may try |SparkContext.wholeTextFiles|. Otherwise you can try something like this: |val filenames: Seq[String] = ... val combined: RDD[(String,String)] = filenames.map { name = sc.textFile(name).map(line = name - line) }.reduce(_ ++ _) | On 9/26/14 6:45 PM, Shekhar Bansal wrote: Hi In one of our usecase, filename contains timestamp and we have to append it in the record for aggregation. How can I access filename in map function? Thanks!
Re: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction?
Yes it is. The in-memory storage used with |SchemaRDD| also uses |RDD.cache()| under the hood. On 9/26/14 4:04 PM, Haopu Wang wrote: Hi, I'm querying a big table using Spark SQL. I see very long GC time in some stages. I wonder if I can improve it by tuning the storage parameter. The question is: the schemaRDD has been cached with cacheTable() function. So is the cached schemaRDD part of memory storage controlled by the spark.storage.memoryFraction parameter? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
mappartitions data size
Hi all, I am using mappartitions to do some heavy computing on subsets of the data. I have a dataset with about 1m rows, running on a 32 core cluster. Unfortunately, is seems that mappartitions splits the data into two sets so it is only running on two cores. Is there a way to force it to split into smaller chunks? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: mappartitions data size
Use RDD.repartition (see here: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD ). On Fri, Sep 26, 2014 at 10:19 AM, jamborta jambo...@gmail.com wrote: Hi all, I am using mappartitions to do some heavy computing on subsets of the data. I have a dataset with about 1m rows, running on a 32 core cluster. Unfortunately, is seems that mappartitions splits the data into two sets so it is only running on two cores. Is there a way to force it to split into smaller chunks? thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: SPARK UI - Details post job processiong
Yes, I’m running Hadoop’s Timeline server that does this for the YARN/Hadoop logs (and works very nicely btw). Are you saying I can do the same for the SparkUI as well? Also, where do I set these Spark configurations since this will be executed inside a YARN container? On the “client” machine via spark-env.sh? Do I pass these as command line arguments to spark-submit? Do I set them explicitly on my SparkConf? Thanks in advance. mn On Sep 25, 2014, at 9:13 PM, Andrew Ash and...@andrewash.com wrote: Matt you should be able to set an HDFS path so you'll get logs written to a unified place instead of to local disk on a random box on the cluster. On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell matt.narr...@gmail.com wrote: How does this work with a cluster manager like YARN? mn On Sep 25, 2014, at 2:23 PM, Andrew Or and...@databricks.com wrote: Hi Harsha, You can turn on `spark.eventLog.enabled` as documented here: http://spark.apache.org/docs/latest/monitoring.html. Then, if you are running standalone mode, you can access the finished SparkUI through the Master UI. Otherwise, you can start a HistoryServer to display finished UIs. -Andrew 2014-09-25 12:55 GMT-07:00 Harsha HN 99harsha.h@gmail.com: Hi, Details laid out in Spark UI for the job in progress is really interesting and very useful. But this gets vanished once the job is done. Is there a way to get job details post processing? Looking for Spark UI data, not standard input,output and error info. Thanks, Harsha
Re: Is it possible to use Parquet with Dremel encoding
Thank you Jey, That is a nice introduction but it is a may be to old (AUG 21ST, 2013) Note: If you keep the schema flat (without nesting), the Parquet files you create can be read by systems like Shark and Impala. These systems allow you to query Parquet files as tables using SQL-like syntax. The Parquet files created by this sample application could easily be queried using Shark for example. But in this post (http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-CaseClass-Parquet-failure-td8377.html) I found this: Nested parquet is not supported in 1.0, but is part of the upcoming 1.0.1 release. So the question now is, can I use it in the benefit way of nested parquet files to find fast with sql or do I have to write a special map/reduce job to transform and find my data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15234.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is it possible to use Parquet with Dremel encoding
Hi Matthes, Can you post an example of your schema? When you refer to nesting, are you referring to optional columns, nested schemas, or tables where there are repeated values? Parquet uses run-length encoding to compress down columns with repeated values, which is the case that your example seems to refer to. The point Matt is making in his post is that if you have a Parquet files with contain records with a nested schema, e.g.: record MyNestedSchema { int nestedSchemaField; } record MySchema { int nonNestedField; MyNestedSchema nestedRecord; } Not all systems support queries against these schemas. If you want to load the data directly into Spark, it isn’t an issue. I’m not familiar with how SparkSQL is handling this, but I believe the bit you quoted is saying that support for nested queries (e.g., select ... from … where nestedRecord.nestedSchemaField == 0) will be added in Spark 1.0.1 (which is currently available, BTW). Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 26, 2014, at 7:38 AM, matthes mdiekst...@sensenetworks.com wrote: Thank you Jey, That is a nice introduction but it is a may be to old (AUG 21ST, 2013) Note: If you keep the schema flat (without nesting), the Parquet files you create can be read by systems like Shark and Impala. These systems allow you to query Parquet files as tables using SQL-like syntax. The Parquet files created by this sample application could easily be queried using Shark for example. But in this post (http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-CaseClass-Parquet-failure-td8377.html) I found this: Nested parquet is not supported in 1.0, but is part of the upcoming 1.0.1 release. So the question now is, can I use it in the benefit way of nested parquet files to find fast with sql or do I have to write a special map/reduce job to transform and find my data? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15234.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SPARK UI - Details post job processiong
I am working on a PR that allows one to send the same spark listener event message back to the application in yarn cluster mode. So far I have put this function in our application, our UI will receive and display the same spark job event message such as progress, job start, completed etc Essentially, it establish a communication channel , you can send over progress, messages and detailed exceptions from spark job inside yarn to your application, on you application side , you can display , or log, make use it in other ways. You can send send message to the running spark job via the channel. I will cleanup the code and send PR soon Chester Alpine Data Lab Sent from my iPhone On Sep 26, 2014, at 7:38 AM, Matt Narrell matt.narr...@gmail.com wrote: Yes, I’m running Hadoop’s Timeline server that does this for the YARN/Hadoop logs (and works very nicely btw). Are you saying I can do the same for the SparkUI as well? Also, where do I set these Spark configurations since this will be executed inside a YARN container? On the “client” machine via spark-env.sh? Do I pass these as command line arguments to spark-submit? Do I set them explicitly on my SparkConf? Thanks in advance. mn On Sep 25, 2014, at 9:13 PM, Andrew Ash and...@andrewash.com wrote: Matt you should be able to set an HDFS path so you'll get logs written to a unified place instead of to local disk on a random box on the cluster. On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell matt.narr...@gmail.com wrote: How does this work with a cluster manager like YARN? mn On Sep 25, 2014, at 2:23 PM, Andrew Or and...@databricks.com wrote: Hi Harsha, You can turn on `spark.eventLog.enabled` as documented here: http://spark.apache.org/docs/latest/monitoring.html. Then, if you are running standalone mode, you can access the finished SparkUI through the Master UI. Otherwise, you can start a HistoryServer to display finished UIs. -Andrew 2014-09-25 12:55 GMT-07:00 Harsha HN 99harsha.h@gmail.com: Hi, Details laid out in Spark UI for the job in progress is really interesting and very useful. But this gets vanished once the job is done. Is there a way to get job details post processing? Looking for Spark UI data, not standard input,output and error info. Thanks, Harsha
Build error when using spark with breeze
Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: rsync problem
Hi, This is the command I am using for submitting my application, SimpleApp: ./bin/spark-submit --class org.apache.spark.examples.SimpleApp --deploy-mode client --master spark://karthik:7077 $SPARK_HOME/examples/*/scala-*/spark-examples-*.jar /text-data On Thu, Sep 25, 2014 at 6:52 AM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, I assume you unintentionally did not reply to the list, so I'm adding it back to CC. How do you submit your job to the cluster? Tobias On Thu, Sep 25, 2014 at 2:21 AM, rapelly kartheek kartheek.m...@gmail.com wrote: How do I find out whether a node in the cluster is a master or slave?? Till now I was thinking that slaves file under the conf folder makes the difference. Also, the MASTER_MASTER_IP in the spark-env.sh file. what else differentiates a slave from the master?? On Wed, Sep 24, 2014 at 10:46 PM, rapelly kartheek kartheek.m...@gmail.com wrote: The job execution is taking place perfectly. Previously, all my print statements used to be stored in spark/work/*/stdout file. But, now after doing the rsync, I find that none of the prtint statements are getting reflected in the stdout file under work folder. But, when I go to the code, I find the statements in the code. But, they are not reflected into the stdout file as before. Can you please tell me where I went wrong. All I want is to see my mofication in the code getting relected in output . On Wed, Sep 24, 2014 at 10:22 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Hi, I have a very important and fundamental doubt: I have rsynced the entire spark folder from the master to all slaves in the cluster. When I execute a job, its working perfectly. But, when I rsync the entire spark folder of the master to all the slaves, is it not that I am sending the master configurations to all the slaves and making the slaves behave like master?? First of all, is it correct to rsync the entire spark folder?? But, if I change only one file, then how do I rsync it to all?? On Fri, Sep 19, 2014 at 8:44 PM, rapelly kartheek kartheek.m...@gmail.com wrote: Thank you Soumya Simantha and Tobias. I've deleted the contents of the work folder in all the nodes. Now its working perfectly as it was before. Thank you Karthik On Fri, Sep 19, 2014 at 4:46 PM, Soumya Simanta soumya.sima...@gmail.com wrote: One possible reason is maybe that the checkpointing directory $SPARK_HOME/work is rsynced as well. Try emptying the contents of the work folder on each node and try again. On Fri, Sep 19, 2014 at 4:53 AM, rapelly kartheek kartheek.m...@gmail.com wrote: I * followed this command:rsync -avL --progress path/to/spark-1.0.0 username@destinationhostname:* *path/to/destdirectory. Anyway, for now, I did it individually for each node.* I have copied to each node at a time individually using the above command. So, I guess the copying may not contain any mixture of files. Also, as of now, I am not facing any MethodNotFound exceptions. But, there is no job execution taking place. After sometime, one by one, each goes down and the cluster shuts down. On Fri, Sep 19, 2014 at 2:15 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Sep 19, 2014 at 5:17 PM, rapelly kartheek kartheek.m...@gmail.com wrote: , * you have copied a lot of files from various hosts to username@slave3:path* only from one node to all the other nodes... I don't think rsync can do that in one command as you described. My guess is that now you have a wild mixture of jar files all across your cluster which will lead to fancy exceptions like MethodNotFound etc., that's maybe why your cluster is not working correctly. Tobias
Re: Build error when using spark with breeze
spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: Is it possible to use Parquet with Dremel encoding
Hi Frank, thanks al lot for your response, this is a very helpful! Actually I'm try to figure out does the current spark version supports Repetition levels (https://blog.twitter.com/2013/dremel-made-simple-with-parquet) but now it looks good to me. It is very hard to find some good things about that. Now I found this as well: https://git-wip-us.apache.org/repos/asf?p=spark.git;a=blob;f=sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala;h=1dc58633a2a68cd910c1bab01c3d5ee1eb4f8709;hb=f479cf37 I wasn't sure of that because nested data can be many different things! If it works with SQL, to find the firstRepeatedid or secoundRepeatedid would be awesome. But if it only works with kind of map/reduce job than it also good. The most important thing is to filter the first or secound repeated value as fast as possible and in combination as well. I start now to play with this things to get the best search results! Me schema looks like this: val nestedSchema = message nestedRowSchema { int32 firstRepeatedid; repeated group level1 { int64 secoundRepeatedid; repeated group level2 { int64 value1; int32 value2; } } } Best, Matthes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to do operations on multiple RDD's
Hi, This is my first post to the email list so give me some feedback if I do something wrong. To do operations on two RDD's to produce a new one you can just use zipPartitions, but if I have an arbitrary number of RDD's that I would like to perform an operation on to produce a single RDD, how do I do that? I've been reading the docs but haven't found anything. For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the majority of each array cell. So if all RDD's have one array which are like this: [1, 2, 3] [0, 0, 0] [1, 2, 0] Then the resulting RDD would have the array [1, 2, 0]. How do I approach this problem? It becomes too heavy to have an accumulator variable I guess? Otherwise it could be an array of maps with values as keys and frequency as values. Essentially I want something like zipPartitions but for arbitrarily many RDD's, is there any such functionality or how would I approach this problem? Cheers, Johan
Re: Build error when using spark with breeze
Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: Is it possible to use Parquet with Dremel encoding
Matthes, Ah, gotcha! Repeated items in Parquet seem to correspond to the ArrayType in Spark-SQL. I only use Spark, but it does looks like that should be supported in Spark-SQL 1.1.0. I’m not sure though if you can apply predicates on repeated items from Spark-SQL. Regards, Frank Austin Nothaft fnoth...@berkeley.edu fnoth...@eecs.berkeley.edu 202-340-0466 On Sep 26, 2014, at 8:48 AM, matthes mdiekst...@sensenetworks.com wrote: Hi Frank, thanks al lot for your response, this is a very helpful! Actually I'm try to figure out does the current spark version supports Repetition levels (https://blog.twitter.com/2013/dremel-made-simple-with-parquet) but now it looks good to me. It is very hard to find some good things about that. Now I found this as well: https://git-wip-us.apache.org/repos/asf?p=spark.git;a=blob;f=sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala;h=1dc58633a2a68cd910c1bab01c3d5ee1eb4f8709;hb=f479cf37 I wasn't sure of that because nested data can be many different things! If it works with SQL, to find the firstRepeatedid or secoundRepeatedid would be awesome. But if it only works with kind of map/reduce job than it also good. The most important thing is to filter the first or secound repeated value as fast as possible and in combination as well. I start now to play with this things to get the best search results! Me schema looks like this: val nestedSchema = message nestedRowSchema { int32 firstRepeatedid; repeated group level1 { int64 secoundRepeatedid; repeated group level2 { int64 value1; int32 value2; } } } Best, Matthes -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15239.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Build error when using spark with breeze
Shouldn't the user's application depend on commons-math3 if it uses it? it shouldn't require a Spark change. Maybe I misunderstand. On Fri, Sep 26, 2014 at 4:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3 in package org.apache.commons which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling RandBasis.class. In my case, I just declare a new Gaussian distribution val g = new Gaussian(0d,1d) I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Build error when using spark with breeze
You can use scope of runtime. See http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope Cheers On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: Build error when using spark with breeze
I solve the problem by including the commons-math3 package in my sbt dependencies as Sean suggested. Thanks. On Fri, Sep 26, 2014 at 6:05 PM, Ted Yu yuzhih...@gmail.com wrote: You can use scope of runtime. See http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope Cheers On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : *Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3* *in package org.apache.commons which is not available.* *It may be completely missing from the current classpath, or the version on* *the classpath might be incompatible with the version used when compiling RandBasis.class.* In my case, I just declare a new Gaussian distribution *val g = new Gaussian(0d,1d)* I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao
Re: spark-ec2 script with Tachyon
Hi, Did you manage to figure this out? I would appreciate if you could share the answer. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-script-with-Tachyon-tp9996p15249.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to run spark job on yarn with jni lib?
I assume you did those things in all machines, not just on the machine launching the job? I've seen that workaround used successfully (well, actually, they copied the library to /usr/lib or something, but same idea). On Thu, Sep 25, 2014 at 7:45 PM, taqilabon g945...@gmail.com wrote: You're right, I'm suffering from SPARK-1719. I've tried to add their location to /etc/ld.so.conf and I've submitted my job as a yarn-client, but the problem is the same: my native libraries are not loaded. Does this method work in your case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146p15195.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException Error in task deserialization
I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: Build error when using spark with breeze
We removed commons-math3 from dependencies to avoid version conflict with hadoop-common. hadoop-common-2.3+ depends on commons-math3-3.1.1, while breeze depends on commons-math3-3.3. 3.3 is not backward compatible with 3.1.1. So we removed it because the breeze functions we use do not touch commons-math3 code. As Sean suggested, please include breeze in the dependency set of your project. Do not rely on transitive dependencies. -Xiangrui On Fri, Sep 26, 2014 at 9:08 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: I solve the problem by including the commons-math3 package in my sbt dependencies as Sean suggested. Thanks. On Fri, Sep 26, 2014 at 6:05 PM, Ted Yu yuzhih...@gmail.com wrote: You can use scope of runtime. See http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope Cheers On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Thank Ted. Can you tell me how to adjust the scope ? On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu yuzhih...@gmail.com wrote: spark-core's dependency on commons-math3 is @ test scope (core/pom.xml): dependency groupIdorg.apache.commons/groupId artifactIdcommons-math3/artifactId version3.3/version scopetest/scope /dependency Adjusting the scope should solve the problem below. On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, I'm using some functions from Breeze in a spark job but I get the following build error : Error:scalac: bad symbolic reference. A signature in RandBasis.class refers to term math3 in package org.apache.commons which is not available. It may be completely missing from the current classpath, or the version on the classpath might be incompatible with the version used when compiling RandBasis.class. In my case, I just declare a new Gaussian distribution val g = new Gaussian(0d,1d) I'm using spark 1.1 Any ideas to fix this ? Best regards, Jao - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to do operations on multiple RDD's
There are numerous ways to combine RDDs. In your case, it seems you have several RDDs of the same type and you want to do an operation across all of them as if they were a single RDD. The way to do this is SparkContext.union or RDD.union, which have minimal overhead. The only difference between these is the latter allows you to only union two at a time (but of course you can just call reduce on your sequence to union them all). Keep in mind this won't repartition anything, so if you find you have too many partitions after the union you could use RDD.coalesce to merge them. On Fri, Sep 26, 2014 at 11:55 AM, Johan Stenberg johanstenber...@gmail.com wrote: Hi, This is my first post to the email list so give me some feedback if I do something wrong. To do operations on two RDD's to produce a new one you can just use zipPartitions, but if I have an arbitrary number of RDD's that I would like to perform an operation on to produce a single RDD, how do I do that? I've been reading the docs but haven't found anything. For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the majority of each array cell. So if all RDD's have one array which are like this: [1, 2, 3] [0, 0, 0] [1, 2, 0] Then the resulting RDD would have the array [1, 2, 0]. How do I approach this problem? It becomes too heavy to have an accumulator variable I guess? Otherwise it could be an array of maps with values as keys and frequency as values. Essentially I want something like zipPartitions but for arbitrarily many RDD's, is there any such functionality or how would I approach this problem? Cheers, Johan -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: java.io.IOException Error in task deserialization
No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
Re: java.io.IOException Error in task deserialization
FWIW I suspect that each count operation is an opportunity for you to trigger the bug, and each filter operation increases the likelihood of setting up the bug. I normally don't come across this error until my job has been running for an hour or two and had a chance to build up longer lineages for some RDDs. It sounds like your data is a bit smaller and it's more feasible for you to build up longer lineages more quickly. If you can reduce your number of filter operations (for example by combining some into a single function) that may help. It may also help to introduce persistence or checkpointing at intermediate stages so that the length of the lineages that have to get replayed isn't as long. On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja aahuj...@gmail.com wrote: No for me as well it is non-deterministic. It happens in a piece of code that does many filter and counts on a small set of records (~1k-10k). The originally set is persisted in memory and we have a Kryo serializer set for it. The task itself takes in just a few filtering parameters. This with the same setting has sometimes completed to sucess and sometimes failed during this step. Arun On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller bmill...@eecs.berkeley.edu wrote: I've had multiple jobs crash due to java.io.IOException: unexpected exception type; I've been running the 1.1 branch for some time and am now running the 1.1 release binaries. Note that I only use PySpark. I haven't kept detailed notes or the tracebacks around since there are other problems that have caused my greater grief (namely key not found errors). For me the exception seems to occur non-deterministically, which is a bit interesting since the error message shows that the same stage has failed multiple times. Are you able to consistently re-produce the bug across multiple invocations at the same place? On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja aahuj...@gmail.com wrote: Has anyone else seen this erorr in task deserialization? The task is processing a small amount of data and doesn't seem to have much data hanging to the closure? I've only seen this with Spark 1.1 Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): java.io.IOException: unexpected exception type java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538) java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744)
problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
Hi Davies The real issue is about cluster management. I am new to the spark world and am not a system administrator. It seem like the problem is with the spark-ec2 launch script. It is installing old version of python In the mean time I am trying to figure out how I can manually install the correct version on all the machines in my cluster Thanks Andy From: Davies Liu dav...@databricks.com Date: Thursday, September 25, 2014 at 9:58 PM To: Andrew Davidson a...@santacruzintegration.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}
Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
Are you able to use the regular PySpark shell on your EC2 cluster? That would be the first thing to confirm is working. I don’t know whether the version of Python on the cluster would affect whether IPython works or not, but if you want to try manually upgrading Python on a cluster launched by spark-ec2, there are some instructions in the comments here https://issues.apache.org/jira/browse/SPARK-922 for doing so. Nick On Fri, Sep 26, 2014 at 2:18 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi Davies The real issue is about cluster management. I am new to the spark world and am not a system administrator. It seem like the problem is with the spark-ec2 launch script. It is installing old version of python In the mean time I am trying to figure out how I can manually install the correct version on all the machines in my cluster Thanks Andy From: Davies Liu dav...@databricks.com Date: Thursday, September 25, 2014 at 9:58 PM To: Andrew Davidson a...@santacruzintegration.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]’}
Re: java.lang.NegativeArraySizeException in pyspark
What is the error? Could you file a JIRA for it? Turns out there's actually 3 separate errors (indicated below), one of which **silently returns the wrong value to the user*.* Should I file a separate JIRA for each one? What level should I mark these as (critical, major, etc.)? I'm not sure that all of these are bugs as much as feature requests since it looks like the design of FramedSerializer includes some size constraints (https://github.com/apache/spark/blob/master/python/pyspark/serializers.py Serializer that writes objects as a stream of (length, data) pairs, where C{length} is a 32-bit integer and data is C{length} bytes.). Attempting to reproduce the bug in isolation in iPython notebook I've observed the following. Note that I'm running python 2.7.3 on all machines and using the Spark 1.1.0 binaries. **BLOCK 1** [no problem] import cPickle from pyspark import SparkContext def check_pre_serialized(size): msg = cPickle.dumps(range(2 ** size)) print 'serialized length:', len(msg) bvar = sc.broadcast(msg) print 'length recovered from broadcast variable:', len(bvar.value) print 'correct value recovered:', msg == bvar.value bvar.unpersist() def check_unserialized(size): msg = range(2 ** size) bvar = sc.broadcast(msg) print 'correct value recovered:', msg == bvar.value bvar.unpersist() SparkContext.setSystemProperty('spark.executor.memory', '15g') SparkContext.setSystemProperty('spark.cores.max', '5') sc = SparkContext('spark://crosby.research.intel-research.net:7077', 'broadcast_bug') **BLOCK 2** [no problem] check_pre_serialized(20) serialized length: 9374656 length recovered from broadcast variable: 9374656 correct value recovered: True **BLOCK 3** [no problem] check_unserialized(20) correct value recovered: True **BLOCK 4** [no problem] check_pre_serialized(27) serialized length: 1499501632 length recovered from broadcast variable: 1499501632 correct value recovered: True **BLOCK 5** [no problem] check_unserialized(27) correct value recovered: True ***BLOCK 6** [ERROR 1: unhandled error from cPickle.dumps inside sc.broadcast]* check_pre_serialized(28) . /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) 354 355 def dumps(self, obj): -- 356 return cPickle.dumps(obj, 2) 357 358 loads = cPickle.loads SystemError: error return without exception set **BLOCK 7** [no problem] check_unserialized(28) correct value recovered: True ***BLOCK 8** [ERROR 2: no error occurs and *incorrect result* is returned]* check_pre_serialized(29) serialized length: 6331339840 length recovered from broadcast variable: 2036372544 correct value recovered: False ***BLOCK 9** [ERROR 3: unhandled error from zlib.compress inside sc.broadcast]* check_unserialized(29) .. /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj) 418 419 def dumps(self, obj): -- 420 return zlib.compress(self.serializer.dumps(obj), 1) 421 422 def loads(self, obj): OverflowError: size does not fit in an int ***BLOCK 10** [ERROR 1]* check_pre_serialized(30) ...same as above... ***BLOCK 11** [ERROR 3]* check_unserialized(30) ...same as above... On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu dav...@databricks.com wrote: On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, Thanks for your help. I ultimately re-wrote the code to use broadcast variables, and then received an error when trying to broadcast self.all_models that the size did not fit in an int (recall that broadcasts use 32 bit ints to store size), What is the error? Could you file a JIRA for it? that it was in fact over 2G. I don't know why the previous tests (described above) where duplicated portions of self.all_models worked (it could have been an error in either my debugging or notes), but splitting the self.all_models into a separate broadcast variable for each element worked. I avoided broadcast variables for a while since there was no way to unpersist them in pyspark, but now that there is you're completely right that using broadcast is the correct way to code this. In 1.1, you could use broadcast.unpersist() to release it, also the performance of Python Broadcast was much improved in 1.1. best, -Brad On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu dav...@databricks.com wrote: Or maybe there is a bug related to the base64 in py4j, could you dumps the serialized bytes of closure to verify this? You could add a line in spark/python/pyspark/rdd.py: ser = CloudPickleSerializer() pickled_command = ser.dumps(command) + print len(pickled_command), repr(pickled_command) On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller bmill...@eecs.berkeley.edu wrote: Hi Davies, That's interesting to know. Here's more details about my code. The object (self) contains pointers
SF Scala: Spark and Machine Learning Videos
Folks -- we're happy to share the videos of Spark talks made at SF Scala meetup (sfscala.org) and Scala By the Bay conference (scalabythebay.org). We thank Databricks for presenting and also sponsoring the first talk video, which was a joint event with SF Bay Area Machine Learning meetup. 9/22/2014 -- SF Scala and SF Bay Area Machine Learning, Joseph Bradley: Decision Trees on Spark http://functional.tv/post/98342564544/sfscala-sfbaml-joseph-bradley-decision-trees-on-spark 8/9/2014 -- Scala By the Bay, Matei Zaharia: Next-Generation Languages meet Next-Generation Big Data: Leveraging Scala in Spark http://functional.tv/post/9769999/scala-by-the-bay2014-matei-zaharia-next-generation-langu 8/8/2014 -- Scala By the Bay, Tathagata Das, Large scale, real-time stream processing using Spark Streaming http://functional.tv/post/97739069219/scala-by-the-bay-2014-tathagata-das-large-scale-real-tim Functional.TV has all of the Scala By the Bay and SF Scala talks, and we publish them within days of our events. If you have a great talk on Scala and Spark, let us at SF Scala know (reply to me with the idea). Enjoy! A+ - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Fwd: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction?
-- Forwarded message -- From: Liquan Pei liquan...@gmail.com Date: Fri, Sep 26, 2014 at 1:33 AM Subject: Re: Spark SQL question: is cached SchemaRDD storage controlled by spark.storage.memoryFraction? To: Haopu Wang hw...@qilinsoft.com Hi Haopu, Internally, cactheTable on a schemaRDD is implemented as a cache() on a MapPartitionsRDD. As memory reserved for caching RDDs is controlled by spark.storage.memoryFraction, memory storage of cached schemaRDD is controlled by spark.storage.memoryFraction. Hope this helps! Liquan On Fri, Sep 26, 2014 at 1:04 AM, Haopu Wang hw...@qilinsoft.com wrote: Hi, I'm querying a big table using Spark SQL. I see very long GC time in some stages. I wonder if I can improve it by tuning the storage parameter. The question is: the schemaRDD has been cached with cacheTable() function. So is the cached schemaRDD part of memory storage controlled by the spark.storage.memoryFraction parameter? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Liquan Pei Department of Physics University of Massachusetts Amherst
Communication between threads within a worker
Hello, Can someone please explain me how the various threads within a single worker (and hence a single JVM instance) communicate with each other. I mean how do they send intermediate data/RDDs to each other? Is it done through network? Please also point me to the location in source code where I can look at the relevant code. Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Communication-between-threads-within-a-worker-tp15262.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
Many many thanks Andy From: Nicholas Chammas nicholas.cham...@gmail.com Date: Friday, September 26, 2014 at 11:24 AM To: Andrew Davidson a...@santacruzintegration.com Cc: Davies Liu dav...@databricks.com, user@spark.apache.org user@spark.apache.org Subject: Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found Are you able to use the regular PySpark shell on your EC2 cluster? That would be the first thing to confirm is working. I don’t know whether the version of Python on the cluster would affect whether IPython works or not, but if you want to try manually upgrading Python on a cluster launched by spark-ec2, there are some instructions in the comments here https://issues.apache.org/jira/browse/SPARK-922 for doing so. Nick On Fri, Sep 26, 2014 at 2:18 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi Davies The real issue is about cluster management. I am new to the spark world and am not a system administrator. It seem like the problem is with the spark-ec2 launch script. It is installing old version of python In the mean time I am trying to figure out how I can manually install the correct version on all the machines in my cluster Thanks Andy From: Davies Liu dav...@databricks.com Date: Thursday, September 25, 2014 at 9:58 PM To: Andrew Davidson a...@santacruzintegration.com Cc: user@spark.apache.org user@spark.apache.org Subject: Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found Maybe you have Python 2.7 on master but Python 2.6 in cluster, you should upgrade python to 2.7 in cluster, or use python 2.6 in master by set PYSPARK_PYTHON=python2.6 On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson a...@santacruzintegration.com wrote: Hi I am running into trouble using iPython notebook on my cluster. Use the following command to set the cluster up $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME On master I launch python as follows $ IPYTHON_OPTS=notebook --pylab inline --no-browser --port=7000 $SPARK_HOME/bin/pyspark It looks like the problem is the cluster is using an old version of python and python. Any idea how I can easily upgrade ? The following version works on my mac Thanks Andy {'commit_hash': '681fd77', 'commit_source': 'installation', 'default_encoding': 'UTF-8', 'ipython_path': '/Library/Python/2.7/site-packages/IPython', 'ipython_version': '2.1.0', 'os_name': 'posix', 'platform': 'Darwin-13.3.0-x86_64-i386-64bit', 'sys_executable': '/usr/bin/python', 'sys_platform': 'darwin', 'sys_version': '2.7.5 (default, Mar 9 2014, 22:15:05) \n[GCC 4.2.1 Compatible Apple LLVM 5.0 (clang-500.0.68)]’}
SparkSQL: map type MatchError when inserting into Hive table
Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL: map type MatchError when inserting into Hive table
It might be a problem when inserting into a partitioned table. It worked fine to when the target table was unpartitioned. Can you confirm this? Thanks, Du On 9/26/14, 4:48 PM, Du Li l...@yahoo-inc.com.INVALID wrote: Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala: 2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scal a :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app l y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app l y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$s q l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sc a la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv e File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv e File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java: 1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java : 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flume spark streaming receiver host random
the receiver is not running on the machine I expect 2014-09-26 14:09 GMT+08:00 Sean Owen so...@cloudera.com: I think you may be missing a key word here. Are you saying that the machine has multiple interfaces and it is not using the one you expect or the receiver is not running on the machine you expect? -- cente...@gmail.com|齐忠 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL: map type MatchError when inserting into Hive table
Would you mind to provide the DDL of this partitioned table together with the query you tried? The stacktrace suggests that the query was trying to cast a map into something else, which is not supported in Spark SQL. And I doubt whether Hive support casting a complex type to some other type. On 9/27/14 7:48 AM, Du Li wrote: Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: problem with HiveContext inside Actor
This fix is reasonable, since the actual constructor gets called is |Driver()| rather than |Driver(HiveConf)|. The former initializes the |conf| field by: |conf = SessionState.get().getConf() | And |SessionState.get()| reads a TSS value. Thus executing SQL queries within another thread causes NPE since the |Driver| is created in a thread different from the one |HiveContext| (and the contained |SessionState|) gets constructed. On 9/19/14 3:31 AM, Du Li wrote: I have figured it out. As shown in the code below, if the HiveContext hc were created in the actor object and used to create db in response to message, it would throw null pointer exception. This is fixed by creating the HiveContext inside the MyActor class instead. I also tested the code by replacing Actor with Thread. The problem and fix are similar. Du —— abstract class MyMessage case object CreateDB extends MyMessage object MyActor { def init(_sc: SparkContext) = { if( actorSystem == null || actorRef == null ) { actorSystem = ActorSystem(“root) actorRef = actorSystem.actorOf(Props(new MyActor(_sc)), “myactor) } //hc = new MyHiveContext(_sc) } def !(m: MyMessage) { actorRef ! m } //var hc: MyHiveContext = _ private var actorSystem: ActorSystem = null private var actorRef: ActorRef = null } class MyActor(sc: SparkContext) extends Actor { val hc = new MyHiveContext(sc) def receive: Receiver = { case CreateDB = hc.createDB() } } class MyHiveContext(sc: SparkContext) extends HiveContext(sc) { def createDB() {...} } From: Chester @work ches...@alpinenow.com mailto:ches...@alpinenow.com Date: Thursday, September 18, 2014 at 7:17 AM To: Du Li l...@yahoo-inc.com.INVALID mailto:l...@yahoo-inc.com.INVALID Cc: Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com, Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com, user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor Akka actor are managed under a thread pool, so the same actor can be under different thread. If you create HiveContext in the actor, is it possible that you are essentially create different instance of HiveContext ? Sent from my iPhone On Sep 17, 2014, at 10:14 PM, Du Li l...@yahoo-inc.com.INVALID mailto:l...@yahoo-inc.com.INVALID wrote: Thanks for your reply. Michael: No. I only create one HiveContext in the code. Hao: Yes. I subclass HiveContext and defines own function to create database and then subclass akka Actor to call that function in response to an abstract message. By your suggestion, I called println(sessionState.getConf.getAllProperties) that printed tons of properties; however, the same NullPointerException was still thrown. As mentioned, the weird thing is that everything worked fine if I simply called actor.hiveContext.createDB() directly. But it throws the null pointer exception from Driver.java if I do actor ! CreateSomeDB”, which seems to me just the same thing because the actor does nothing but call createDB(). Du From: Michael Armbrust mich...@databricks.com mailto:mich...@databricks.com Date: Wednesday, September 17, 2014 at 7:40 PM To: Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com Cc: Du Li l...@yahoo-inc.com.invalid mailto:l...@yahoo-inc.com.invalid, user@spark.apache.org mailto:user@spark.apache.org user@spark.apache.org mailto:user@spark.apache.org Subject: Re: problem with HiveContext inside Actor - dev Is it possible that you are constructing more than one HiveContext in a single JVM? Due to global state in Hive code this is not allowed. Michael On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao hao.ch...@intel.com mailto:hao.ch...@intel.com wrote: Hi, Du I am not sure what you mean “triggers the HiveContext to create a database”, do you create the sub class of HiveContext? Just be sure you call the “HiveContext.sessionState” eagerly, since it will set the proper “hiveconf” into the SessionState, otherwise the HiveDriver will always get the null value when retrieving HiveConf. Cheng Hao From: Du Li [mailto:l...@yahoo-inc.com.INVALID] Sent: Thursday, September 18, 2014 7:51 AM To: user@spark.apache.org mailto:user@spark.apache.org; d...@spark.apache.org mailto:d...@spark.apache.org Subject: problem with HiveContext inside Actor Hi, Wonder anybody had similar experience or any suggestion here. I have an akka Actor that processes database requests in high-level messages. Inside this Actor, it creates a HiveContext object that does the actual db work. The main thread creates the needed SparkContext and passes in to the Actor to create the HiveContext. When a message is sent to the Actor, it is processed properly except that, when the message triggers the HiveContext to create a database, it throws a NullPointerException in hive.ql.Driver.java which suggests that its conf variable is not
Re: SparkSQL: map type MatchError when inserting into Hive table
Would you mind to provide the DDL of this partitioned table together with the query you tried? The stacktrace suggests that the query was trying to cast a map into something else, which is not supported in Spark SQL. And I doubt whether Hive support casting a complex type to some other type. On 9/27/14 7:48 AM, Du Li wrote: Hi, I was loading data into a partitioned table on Spark 1.1.0 beeline-thriftserver. The table has complex data types such as mapstring, string and arraymapstring,string. The query is like ³insert overwrite table a partition (Š) select Š² and the select clause worked if run separately. However, when running the insert query, there was an error as follows. The source code of Cast.scala seems to only handle the primitive data types, which is perhaps why the MatchError was thrown. I just wonder if this is still work in progress, or I should do it differently. Thanks, Du scala.MatchError: MapType(StringType,StringType,true) (of class org.apache.spark.sql.catalyst.types.MapType) org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2 47) org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala :84) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:66) org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl y(Projection.scala:50) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca la:149) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive File$1.apply(InsertIntoHiveTable.scala:158) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 615) java.lang.Thread.run(Thread.java:722) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org