RE: problem connection to hdfs on localhost from spark-shell
Change your conf/spark-env.sh: export HADOOP_CONF_DIR=/etc/hadoop/confexport YARN_CONF_DIR=/etc/hadoop/conf Date: Thu, 28 Aug 2014 16:19:05 -0700 From: ml-node+s1001560n13074...@n3.nabble.com To: linkpatrick...@live.com Subject: problem connection to hdfs on localhost from spark-shell I have HDFS servers running locally and hadoop dfs -ls / are all running fine. From spark-shell I do this: val lines = sc.textFile(hdfs:///test) and I get this error message. java.io.IOException: Failed on local exception: java.io.EOFException; Host Details : local host is: localhost.localdomain/127.0.0.1; destination host is: localhost:9000; I tried changing the contents of /etc/hosts to no avail. I also tried urls like hdfs://localhost:9000/test and many other variants. Nothing worked. Also I see a message (during spark-shell startup) that it has bound to a 192.168.x.x address. Any help is appreciated. -- Bharath If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/problem-connection-to-hdfs-on-localhost-from-spark-shell-tp13074.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here. NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problem-connection-to-hdfs-on-localhost-from-spark-shell-tp13074p13101.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
u'' notation with pyspark output data
Hi , I am working with pyspark and doing simple aggregation def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) counts = lines.map(doSplit).groupByKey() output = counts.collect() Iterating over output I got such format of the data u'1385501280' , u'14.0' , but actually I need to work with 14 instead of u'14.0' and 1385501280 u'1385501280' Question: how to get actually data without u'' notation? Thanks Oleg.
Re: RE: The concurrent model of spark job/stage/task
hi, dear Thanks for the response. Some comments below. and yes, I am using spark on yarn. 1. The release doc of spark says multi jobs can be submitted in one application if the jobs(actions) are submit by different threads. I wrote some java thread code in driver, one action in each thread, and the stages are run concurrently which is observed on stages UI. In my understanding the DAGscheduler generates different graph for each action. Not sure correct or not. Originally I was hoping the sparkcontext can generate different jobs for none-relevant actions, but never try it successfully. 2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently? 3. I want to reterive the original data out of RDD and have other computation on the data. Like get the value of tempreture or other data, and works on them. 35597...@qq.com From: linkpatrickliu Date: 2014-08-29 14:01 To: user Subject: RE: The concurrent model of spark job/stage/task Hi, Please see the answers following each question. If there's any mistake, please let me know. Thanks! I am not sure which mode you are running. So I will assume you are using spark-submit script to submit spark applications to spark cluster(spark-standalone or Yarn) 1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 actions in the code, but the job still staged in index 0, 1, 2, 3... looks they run secquencly. A spark application is a job, you init the application by create a SparkContext. The SparkContext will init the driver program for you. So if you want to run multiple jobs simultaneously, you have to split the jobs into different applications, and submit each of them. The driver program is like an ApplicationMaster in yarn. It translate the spark application into a DAG graph, and schedule each stage to workers. Each stage consists of multiple Tasks. The driver program handles the life cycle of a spark application. 2. are the stages run currently? because they always number in order 0, 1. 2. 3.. I obverserved on the spark stage UI. No. Stages will run sequentially. It's a DAG graph, each stage depends on its parent. 3. Can I retrieve the data out of RDD? like populate a pojo myself and compute on it. Not sure what you mean? You can only retrieve a RDD related with your own SparkContext. But once a spark application is finished, the SparkContext is released. RDDs related with the SparkContext are released too. Best regards, Patrick Liu Date: Thu, 28 Aug 2014 18:35:44 -0700 From: [hidden email] To: [hidden email] Subject: The concurrent model of spark job/stage/task hi, guys I am trying to understand how spark work on the concurrent model. I read below from https://spark.apache.org/docs/1.0.2/job-scheduling.html quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users). I searched everywhere but not get: 1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 actions in the code, but the job still staged in index 0, 1, 2, 3... looks they run secquencly. 2. are the stages run currently? because they always number in order 0, 1. 2. 3.. I obverserved on the spark stage UI. 3. Can I retrieve the data out of RDD? like populate a pojo myself and compute on it. Thanks in advance, guys. [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-tp13083.html To start a new topic under Apache Spark User List, email [hidden email] To unsubscribe from Apache Spark User List, click here. NAML View this message in context: RE: The concurrent model of spark job/stage/task Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How to get prerelease thriftserver working?
Hey Michael, Cheng, Thanks for the replies. Sadly I can't remember the specific error so I'm going to chalk it up to user error, especially since others on the list have not had a problem. @michael By the way, was at the Spark 1.1 meetup yesterday. Great event, very informative, cheers and keep doing more! @cheng Got it, cheers. Fortunately we don't have to deal with this use case, but that's good to know (especially the $SPARK_HOME bit). On Wed, Aug 27, 2014 at 3:36 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Matt, if you want to access existing Hive data, you still need a to run a Hive metastore service, and provide a proper hive-site.xml (just drop it in $SPARK_HOME/conf). Could you provide the error log you saw? On Wed, Aug 27, 2014 at 12:09 PM, Michael Armbrust mich...@databricks.com wrote: I would expect that to work. What exactly is the error? On Wed, Aug 27, 2014 at 6:02 AM, Matt Chu m...@kabam.com wrote: (apologies for sending this twice, first via nabble; didn't realize it wouldn't get forwarded) Hey, I know it's not officially released yet, but I'm trying to understand (and run) the Thrift-based JDBC server, in order to enable remote JDBC access to our dev cluster. Before asking about details, is my understanding of this correct? `sbin/start-thriftserver` is a JDBC/Hive server that doesn't require running a Hive+MR cluster (i.e. just Spark/Spark+YARN)? Assuming yes, I have hope that it all basically works, just that some documentation needs to be cleaned up: - I found a release page implying that 1.1 will be released pretty soon-ish: https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage - I can find recent (more recent 30 days or so) activity with promising titles: [Updated Spark SQL README to include the hive-thriftserver module](https://github.com/apache/spark/pull/1867), [[SPARK-2410][SQL] Merging Hive Thrift/JDBC server (with Maven profile fix)](https://github.com/apache/spark/pull/1620) Am I following all the right email threads, issues trackers, and whatnot? Specifically, I tried: 1. Building off of `branch-1.1`, synced as of ~today (2014 Aug 25) 2. Running `sbin/start-thriftserver.sh` in `yarn-client` mode 3. Can see the processing running, and the spark context/app created in yarn logs, and can connect to the thrift server on the default port of 1 using `bin/beeline` 4. However, when I try to find out what that cluster has via `show tables;`, in the logs I see a connection error to some (what I assume to be) random port. So what service am I forgetting/too ignorant to run? Or did I misunderstand and we do need a live Hive instance to back thriftserver? Or is this a YARN-specific issue? Only recently started learning the ecosystem and community, so apologies for the longer post and lots of questions. :) Matt
Re: Spark SQL : how to find element where a field is in a given set
ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: Failed to run runJob at ReceiverTracker.scala
I upped the ulimit to 128k files on all nodes. Job crashed again with DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275. Couldn't get the logs because I killed the job and looks like yarn wipe the container logs (not sure why it wipes the logs under /var/log/hadoop-yarn/container). Next time, I will grab the logs while the job is still active/zombie. So is there a limit on how many times a receiver is re-spawned? Thanks, Tim On Thu, Aug 28, 2014 at 10:06 PM, Tathagata Das tathagata.das1...@gmail.com wrote: It did. It got failed and respawned 4 times. In this case, the too many open files is a sign that you need increase the system-wide limit of open files. Try adding ulimit -n 16000 to your conf/spark-env.sh. TD On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith secs...@gmail.com wrote: Appeared after running for a while. I re-ran the job and this time, it crashed with: 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for stream 0: Error in block pushing thread - java.net.SocketException: Too many open files Shouldn't the failed receiver get re-spawned on a different worker? On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Do you see this error right in the beginning or after running for sometime? The root cause seems to be that somehow your Spark executors got killed, which killed receivers and caused further errors. Please try to take a look at the executor logs of the lost executor to find what is the root cause that caused the executor to fail. TD On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote: Hi, Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died with: 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275 Exception in thread Thread-59 14/08/28 22:28:15 INFO YarnClientClusterScheduler: Cancelling stage 2 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4) 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove executor 5 from BlockManagerMaster. 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6481 on host node-dn1-1.ops.sfdc.net failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Any insights into this error? Thanks, Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream repartitioning, performance tuning processing
I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Having 16 partitions in KafkaUtils.createStream does not translate to the RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the best way to distribute the received data between all the nodes, as long as there are sufficient number of partitions (try setting it to 2x the number cores given to the application). Yeah, in 1.0.0, ttl should be unnecessary. On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote: On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das tathagata.das1...@gmail.com wrote: If you are repartitioning to 8 partitions, and your node happen to have at least 4 cores each, its possible that all 8 partitions are assigned to only 2 nodes. Try increasing the number of partitions. Also make sure you have executors (allocated by YARN) running on more than two nodes if you want to use all 11 nodes in your yarn cluster. If you look at the code, I commented out the manual re-partitioning to 8. Instead, I am created 16 partitions when I call createStream. But I will increase the partitions to, say, 64 and see if I get better parallelism. If you are using Spark 1.x, then you dont need to set the ttl for running Spark Streaming. In case you are using older version, why do you want to reduce it? You could reduce it, but it does increase the risk of the premature cleaning, if once in a while things get delayed by 20 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage shouldnt hurt performance). I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right, unless I have memory issues, more aggressive pruning won't help. Thanks, Tim TD On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote: Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling createStream or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes. I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log echo $! run-6.pid My main code is: val sparkConf = new SparkConf().setAppName(SparkKafkaTest) val ssc = new StreamingContext(sparkConf,Seconds(5)) val kInMsg = KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct - 16)) val propsMap = Map(metadata.broker.list - node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class - kafka.serializer.StringEncoder, producer.type - async, request.required.acks - 1) val to_topic = normStruct val writer = new KafkaOutputService(to_topic, propsMap) if (!configMap.keySet.isEmpty) { //kInMsg.repartition(8) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = { writer.output(rec) }) } ) } ssc.start() ssc.awaitTermination() In terms of total delay, with a 5 second batch, the delays usually stay under 5 seconds, but sometimes jump to ~10 seconds. As a performance tuning question, does this mean, I can reduce my cleaner ttl from 60 to say 25 (still more than double of the peak delay)? Thanks Tim - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark / Thrift / ODBC connectivity
You can use the Thrift server to access Hive tables that locates in legacy Hive warehouse and/or those generated by Spark SQL. Simba provides Spark SQL ODBC driver that enables applications like Tableau. But right now I'm not 100% sure about whether the driver has officially released yet. On Thu, Aug 28, 2014 at 9:42 PM, Denny Lee denny.g@gmail.com wrote: I’m currently using the Spark 1.1 branch and have been able to get the Thrift service up and running. The quick questions were whether I should able to use the Thrift service to connect to SparkSQL generated tables and/or Hive tables? As well, by any chance do we have any documents that point to how we can connect something like Tableau to Spark SQL Thrift - similar to the SAP ODBC connectivity http://www.saphana.com/docs/DOC-472? Thanks! Denny
how can I get the number of cores
Hi all Spark web ui gives me the information about total cores and used cores. I want to get this information programmatically. How can I do this? Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL : how to find element where a field is in a given set
To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: u'' notation with pyspark output data
u'14.0' means a unicode string, you can convert into str by u'14.0'.encode('utf8'), or you can convert it into float by float(u'14.0') Davies On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am working with pyspark and doing simple aggregation def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) counts = lines.map(doSplit).groupByKey() output = counts.collect() Iterating over output I got such format of the data u'1385501280' , u'14.0' , but actually I need to work with 14 instead of u'14.0' and 1385501280 u'1385501280' Question: how to get actually data without u'' notation? Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Ensuring object in spark streaming runs on specific node
Say you have a spark streaming setup such as JavaReceiverInputDStream... rndLists = jssc.receiverStream(new JavaRandomReceiver(...)); rndLists.map(new NeuralNetMapper(...)) .foreach(new JavaSyncBarrier(...)); Is there any way of ensuring that, say, a JavaRandomReceiver and JavaSyncBarrier get distributed to the same node ? Or is this even a question that makes sense ? Some information as to how spark-streaming distributes work across a cluster would also be greatly appreciated. ( i've also asked this question on stackoverflow at http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node ) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Running Spark On Yarn without Spark-Submit
Hi, My requirement is to run Spark on Yarn without using the script spark-submit. I have a servlet and a tomcat server. As and when request comes, it creates a new SC and keeps it alive for the further requests, I ma setting my master in sparkConf as sparkConf.setMaster(yarn-cluster) but the request is stuck indefinitely. This works when I set sparkConf.setMaster(yarn-client) I am not sure, why is it not launching job in yarn-cluster mode. Any thoughts? Thanks and Regards, Archit Thakur.
Re: how to specify columns in groupby
Thank you Yanbo for the reply.. I 've another query related to cogroup.I want to iterate over the results of cogroup operation. My code is * grp = RDD1.cogroup(RDD2) * map((lambda (x,y): (x,list(y[0]),list(y[1]))), list(grp)) My result looks like : [((u'764', u'20140826'), [0.70146274566650391], [ ]), ((u'863', u'20140826'), [0.368011474609375], [ ]), ((u'9571520', u'20140826'), [0.0046129226684570312], [0.60009])] When I do one more cogroup operation like grp1 = grp.cogroup(RDD3) I am not able to see the results.All my RDDs are of the form ((x,y),z).Can somebody help me to solve this. Thanks Regards, Meethu M On Thursday, 28 August 2014 5:59 PM, Yanbo Liang yanboha...@gmail.com wrote: For your reference: val d1 = textFile.map(line = { val fileds = line.split(,) ((fileds(0),fileds(1)), fileds(2).toDouble) }) val d2 = d1.reduceByKey(_+_) d2.foreach(println) 2014-08-28 20:04 GMT+08:00 MEETHU MATHEW meethu2...@yahoo.co.in: Hi all, I have an RDD which has values in the format id,date,cost. I want to group the elements based on the id and date columns and get the sum of the cost for each group. Can somebody tell me how to do this? Thanks Regards, Meethu M
Re: Spark SQL : how to find element where a field is in a given set
Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: Running Spark On Yarn without Spark-Submit
including user@spark.apache.org. On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, My requirement is to run Spark on Yarn without using the script spark-submit. I have a servlet and a tomcat server. As and when request comes, it creates a new SC and keeps it alive for the further requests, I ma setting my master in sparkConf as sparkConf.setMaster(yarn-cluster) but the request is stuck indefinitely. This works when I set sparkConf.setMaster(yarn-client) I am not sure, why is it not launching job in yarn-cluster mode. Any thoughts? Thanks and Regards, Archit Thakur.
Re: How to debug this error?
It's not allowed to use RDD in map function. RDD can only operated at driver of spark program. At your case, group RDD can't be found at every executor. I guess you want to implement subquery like operation, try to use RDD.intersection() or join() 2014-08-29 12:43 GMT+08:00 Gary Zhao garyz...@gmail.com: Hello I'm new to Spark and playing around, but saw the following error. Could anyone to help on it? Thanks Gary scala c res15: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[7] at flatMap at console:23 scala group res16: org.apache.spark.rdd.RDD[(String, Iterable[String])] = MappedValuesRDD[5] at groupByKey at console:19 val d = c.map(i=group.filter(_._1 ==i )) d.first 14/08/29 04:39:33 INFO TaskSchedulerImpl: Cancelling stage 28 14/08/29 04:39:33 INFO DAGScheduler: Failed to run first at console:28 org.apache.spark.SparkException: Job aborted due to stage failure: Task 28.0:180 failed 4 times, most recent failure: Exception failure in TID 3605 on host mcs-spark-slave1-staging.snc1: java.lang.NullPointerException org.apache.spark.rdd.RDD.filter(RDD.scala:282) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:25) $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:25) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$10.next(Iterator.scala:312) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) scala.collection.TraversableOnce$class.to (TraversableOnce.scala:273) scala.collection.AbstractIterator.to(Iterator.scala:1157) scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) scala.collection.AbstractIterator.toArray(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
RE: The concurrent model of spark job/stage/task
Hi, I think an example will help illustrate the model better. /*** SimpleApp.scala ***/import org.apache.spark.SparkContextimport org.apache.spark.SparkContext._ object SimpleApp { def main(args: Array[String]) {val logFile = $YOUR_SPARK_HOME/README.md val sc = new SparkContext(local, Simple App, YOUR_SPARK_HOME, List(target/scala-2.10/simple-project_2.10-1.0.jar)) val logData = sc.textFile(logFile, 2).cache()val numAs = logData.filter(line = line.contains(a)).filter(line = line.contains(c)).count()val numBs = logData.filter(line = line.contains(b)).count()println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) }} The example's DAG graph is corresponding to your graph:Let's see how it works:1. val sc = new SparkContext // This line create the SparkContext(which is the driver) 2. val numAs = logData.filter(line = line.contains(a)).filter(line = line.contains(c)).count()This is a job with 2 transformation and 1 action. 3. val numBs = logData.filter(line = line.contains(b)).count()This is another job with 1 transformation and 1 action. Remember the Scala's LAZY calculation strategy. The job numAs will be calculated by invoking the count() method.It has 3 stages. FilteredRDD(1) - FilteredRDD(2) - RDD.count() (1) RDD.count() will submit it as the Final Stage to DAGScheduler. (2) DAGScheduler analyse the dependency chain, and asks RDD's parent FilteredRDD(2) to be computed first, and FilteredRDD(2) will ask its parent FilteredRDD(1) to computed first. FilteredRDD(1) is the first, so it will be computed.(3) Then DAGScheduler wrap the FilteredRDD(1) as a TaskSet, and submit the TaskSet to TaskSchedulerImple.(4) Then TaskSchedulerImple will schedule the TaskSet by FIFO or FAIR strategy.(5) The tasks in TaskSet will be distributed to different Executor. (6) After all the tasks of this TaskSet have finished. This Stage is marked finished. (RDD will be cached by BlockStore, RDD data can be shared in this SparkContext. If you have a job numCs,val numCs= logData.filter(line = line.contains(a)).filter(line = line.contains(d)).count() the first filter(line = line.contains(a)) can reuse the RDD data computed in numAs.) (7) Then the FilteredRDD(2) will be computed. Then the RDD.count().(8) Finally you have the result for numAs. I think you now understand the submitschedulerun process.Let's see the questions: 1. Each DAGgraph is related with 1 action. You can write multiple actions in a spark application. If you want these actions to run simultaneously, you have to to submit these actions in different threads.2. I think you should pay attention to FIFO or FAIR scheduler strategy. If the first action is too large, maybe the second action will be starved.3. I think the question is how to persist the RDD data to local disk?You could use saveAsTextFile(path) or saveAsSequenceFile(path) to persist RDD data to local dist. Hope this will help you. Best regards,Patrick Liu Date: Thu, 28 Aug 2014 23:34:29 -0700 From: ml-node+s1001560n13104...@n3.nabble.com To: linkpatrick...@live.com Subject: Re: RE: The concurrent model of spark job/stage/task hi, dear Thanks for the response. Some comments below. and yes, I am using spark on yarn.1. The release doc of spark says multi jobs can be submitted in one application if the jobs(actions) are submit by different threads. I wrote some java thread code in driver, one action in each thread, and the stages are run concurrently which is observed on stages UI. In my understanding the DAGscheduler generates different graph for each action. Not sure correct or not.Originally I was hoping the sparkcontext can generate different jobs for none-relevant actions, but never try it successfully. 2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently? 3. I want to reterive the original data out of RDD and have other computation on the data. Like get the value of tempreture or other data, and works on them. [hidden email] From: [hidden email]Date: 2014-08-29 14:01To: [hidden email]Subject: RE: The concurrent model of spark job/stage/task Hi, Please see the answers following each question. If there's any mistake, please let me know. Thanks! I am not sure which mode you are running. So I will assume you are using spark-submit script to submit spark applications to spark cluster(spark-standalone or Yarn) 1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 actions in the code, but the job still staged in index 0, 1, 2, 3... looks they run secquencly.A spark application is a job, you init the application by create a SparkContext. The SparkContext will init the driver program for you.So if you want to run multiple jobs simultaneously, you have to split the jobs into different applications, and submit each of them. The driver program is like an ApplicationMaster in yarn. It translate the spark application into a DAG graph, and schedule each stage to workers.
Re: Spark Hive max key length is 767 bytes
Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Can anyone please help? Regards Arthur On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: (Please ignore if duplicated) Hi, I use Spark 1.0.2 with Hive 0.13.1 I have already set the hive mysql database to latine1; mysql: alter database hive character set latin1; Spark: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala hiveContext.hql(create table test_datatype1 (testbigint bigint )) scala hiveContext.hql(drop table test_datatype1) 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:59 ERROR DataNucleus.Datastore: An exception was thrown while
Re: how to filter value in spark
i see it works well,thank you!!! But in follow situation how to do var a = sc.textFile(/sparktest/1/).map((_,a)) var b = sc.textFile(/sparktest/2/).map((_,b)) How to get (3,a) and (4,a) 在 Aug 28, 2014,19:54,Matthew Farrellee m...@redhat.com 写道: On 08/28/2014 07:20 AM, marylucy wrote: fileA=1 2 3 4 one number a line,save in /sparktest/1/ fileB=3 4 5 6 one number a line,save in /sparktest/2/ I want to get 3 and 4 var a = sc.textFile(/sparktest/1/).map((_,1)) var b = sc.textFile(/sparktest/2/).map((_,1)) a.filter(param={b.lookup(param._1).length0}).map(_._1).foreach(println) Error throw Scala.MatchError:Null PairRDDFunctions.lookup... the issue is nesting of the b rdd inside a transformation of the a rdd consider using intersection, it's more idiomatic a.intersection(b).foreach(println) but not that intersection will remove duplicates best, matt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark On Yarn without Spark-Submit
Archit We are using yarn-cluster mode , and calling spark via Client class directly from servlet server. It works fine. To establish a communication channel to give further requests, It should be possible with yarn client, but not with yarn server. Yarn client mode, spark driver is outside the yarn cluster; so it can issue more commands. In yarn cluster, all programs including spark driver is running inside the yarn cluster. There is no communication channel with the client until the job finishes. If you job is to keep spark context alive, and wait for other commands, then this should wait forever. I am actually working on some improvements on this and experiment in our product, I will create PRs when I feel conformable with the solution 1) change Client API to allow the caller to know yarn app resource capacity before passing arguments 2) add YarnApplicationListener to the Client 3) provide communication channel between application and spark Yarn client in cluster. The #1) is not directly related to the communication discussed here #2) allows the application to have application life cycle call back as to app start end in progress failure etc with yarn resources allocations I changed #1 and #2 in forked spark, and it's worked well in cdh5, and I am testing against 2.0.5-alpha as well. For #3) I did not change in spark currently, as I am not sure the best approach yet. I put the change in the application runner which launch the spark yarn client in the cluster. The runner in yarn cluster get applications host and port information from the passed configuration (args), then creates an Akka actor using spark context actor system, send a hand shake message to the caller outside the cluster, after that you will have a two way communications With this approach, I can send spark listener call backs to the app, error messages, app level messages etc. The runner inside the cluster can also receive requests from outside cluster such as stop. We are not sure Akka approach is the best, so I am still experimenting it. So far it does what we wants . Hope this helps Chester Sent from my iPhone On Aug 29, 2014, at 2:36 AM, Archit Thakur archit279tha...@gmail.com wrote: including user@spark.apache.org. On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, My requirement is to run Spark on Yarn without using the script spark-submit. I have a servlet and a tomcat server. As and when request comes, it creates a new SC and keeps it alive for the further requests, I ma setting my master in sparkConf as sparkConf.setMaster(yarn-cluster) but the request is stuck indefinitely. This works when I set sparkConf.setMaster(yarn-client) I am not sure, why is it not launching job in yarn-cluster mode. Any thoughts? Thanks and Regards, Archit Thakur.
Re: Where to save intermediate results?
Hi Daniel, Your suggestion is definitely an interesting approach. In fact, I already have another system to deal with the stream analytical processing part. So basically, the Spark job to aggregate data just accumulatively computes aggregations from historical data together with new batch, which has been partly summarized by the stream processor. Answering queries involves in combining pre-calculated historical data together with on-stream aggregations. This sounds much like what Spark Streaming is intended to do. So I'll take a look deeper into Spark Streaming to consider porting the stream processing part to use Spark Streaming. Regarding saving pre-calculated data onto external storages (disk, database...), I'm looking at Cassandra for now. But I don't know how it fits into my context and how is its performance compared to saving to files in HDFS. Also, is there anyway to keep the precalculated data both on disk and on memory, so that when the batch job terminated, historical data still available on memory for combining with stream processor, while still be able to survive system failure or upgrade? Not to mention the size of precalculated data might get too big, in that case, partly store newest data on memory only would be better. Tachyon looks like a nice option but again, I don't have experience with it and it's still an experimental feature of Spark. Regards, Huy -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how can I get the number of cores
What version of Spark are you running? Try calling sc.defaultParallelism. I’ve found that it is typically set to the number of worker cores in your cluster. On Fri, Aug 29, 2014 at 3:39 AM, Kevin Jung itsjb.j...@samsung.com wrote: Hi all Spark web ui gives me the information about total cores and used cores. I want to get this information programmatically. How can I do this? Thanks Kevin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
I understand that the DB writes are happening from the workers unless you collect. My confusion is that you believe workers recompute on recovery(nodes computations which get redone upon recovery). My understanding is that checkpointing dumps the RDD to disk and the cuts the RDD lineage. So I thought on driver restart you'll get a set of new executor processes but they would read the last known state of the RDD from HDFS checkpoint. Am I off here? So the only situation I can imagine where you end up recomputing is if your checkpointing at a larger interval than your batch size (i.e. the RDD on disk does not reflect it's last precrash state)? On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Yana, The fact is that the DB writing is happening on the node level and not on Spark level. One of the benefits of distributed computing nature of Spark is enabling IO distribution as well. For example, is much faster to have the nodes to write to Cassandra instead of having them all collected at the driver level and sending the writes from there. The problem is that nodes computations which get redone upon recovery. If these lambda functions send events to other systems these events would get resent upon re-computation causing overall system instability. Hope this helps you understand the problematic. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
How did you specify the HDFS path? When i put spark.eventLog.dir hdfs:// crosby.research.intel-research.net:54310/tmp/spark-events in my spark-defaults.conf file, I receive the following error: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.io.IOException: Call to crosby.research.intel-research.net/10.212.84.53:54310 failed on local exception: java.io.EOFException -Brad On Thu, Aug 28, 2014 at 12:26 PM, SK skrishna...@gmail.com wrote: I was able to recently solve this problem for standalone mode. For this mode, I did not use a history server. Instead, I set spark.eventLog.dir (in conf/spark-defaults.conf) to a directory in hdfs (basically this directory should be in a place that is writable by the master and accessible globally to all the nodes). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming reset state
Hi all, I would like to ask some advice about resetting spark stateful operation. so i tried like this: JavaStreamingContext jssc = new JavaStreamingContext(context, new Duration(5000)); jssc.remember(Duration(5*60*1000)); jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); JavaPairReceiverInputDStreamString, String messages = (JavaPairReceiverInputDStreamString, String) KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group, topicMap); JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH, SLIDE_INTERVAL); JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String, String, LogEntry() { @Override public LogEntry call(Tuple2String, String tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache(); JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE). reduceByKey(Functions.SUM_REDUCER). updateStateByKey(COMPUTE_RUNNING_SUM); i thought by setting the remember to 5 minutes, the codes RDD that derived from messages would also be reseted in 5 minutes, but in fact no. Is there any way to reset the codes RDD after a period of time (5 minutes)? Thanks -- Best Regards, Eko Susilo
Re: Low Level Kafka Consumer for Spark
Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
/tmp/spark-events permissions problem
Hi All, Yesterday I restarted my cluster, which had the effect of clearing /tmp. When I brought Spark back up and ran my first job, /tmp/spark-events was re-created and the job ran fine. I later learned that other users were receiving errors when trying to create a spark context. It turned out the reason was that only my user was able to create subdirectories within /tmp/spark-events. I believe /tmp/spark-events originally had ownership bmiller1:bmiller1 (where bmiller1 is my username) with permissions 770. Once I modified the permission to allow other users to create subdirectories other users were again able to launch jobs. Note that I think this may be related to some problems I am having viewing application history (see link). http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-td3490.html#a13130 Has anybody else experienced a problem with permissions on the spark.eventLog.dir directory? best, -Brad
Re: Spark SQL : how to find element where a field is in a given set
What version are you using? On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: Spark Streaming reset state
codes is a DStream, not an RDD. The remember() method controls how long Spark Streaming holds on to the RDDs itself. Clarify what you mean by reset? codes provides a stream of RDDs that contain your computation over a window of time. New RDDs come with the computation over new data. On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo eko.harmawan.sus...@gmail.com wrote: Hi all, I would like to ask some advice about resetting spark stateful operation. so i tried like this: JavaStreamingContext jssc = new JavaStreamingContext(context, new Duration(5000)); jssc.remember(Duration(5*60*1000)); jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); JavaPairReceiverInputDStreamString, String messages = (JavaPairReceiverInputDStreamString, String) KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group, topicMap); JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH, SLIDE_INTERVAL); JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String, String, LogEntry() { @Override public LogEntry call(Tuple2String, String tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache(); JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE). reduceByKey(Functions.SUM_REDUCER). updateStateByKey(COMPUTE_RUNNING_SUM); i thought by setting the remember to 5 minutes, the codes RDD that derived from messages would also be reseted in 5 minutes, but in fact no. Is there any way to reset the codes RDD after a period of time (5 minutes)? Thanks -- Best Regards, Eko Susilo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we should be able to access it once more and not having to pool it back again from Kafka or any other stream that is being affected by this issue. If for example there is a big amount of batches to be recomputed I would rather have them done distributed than overloading the batch interval with huge amount of Kafka messages. I do not have yet enough know how on where is the issue and about the internal Spark code so I can't really how much difficult will be the implementation. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
Re: Spark Streaming reset state
so the codes currently holding RDD containing codes and its respective counter. I would like to find a way to reset those RDD after some period of time. On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen so...@cloudera.com wrote: codes is a DStream, not an RDD. The remember() method controls how long Spark Streaming holds on to the RDDs itself. Clarify what you mean by reset? codes provides a stream of RDDs that contain your computation over a window of time. New RDDs come with the computation over new data. On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo eko.harmawan.sus...@gmail.com wrote: Hi all, I would like to ask some advice about resetting spark stateful operation. so i tried like this: JavaStreamingContext jssc = new JavaStreamingContext(context, new Duration(5000)); jssc.remember(Duration(5*60*1000)); jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); JavaPairReceiverInputDStreamString, String messages = (JavaPairReceiverInputDStreamString, String) KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group, topicMap); JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH, SLIDE_INTERVAL); JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String, String, LogEntry() { @Override public LogEntry call(Tuple2String, String tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache(); JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE). reduceByKey(Functions.SUM_REDUCER). updateStateByKey(COMPUTE_RUNNING_SUM); i thought by setting the remember to 5 minutes, the codes RDD that derived from messages would also be reseted in 5 minutes, but in fact no. Is there any way to reset the codes RDD after a period of time (5 minutes)? Thanks -- Best Regards, Eko Susilo -- Best Regards, Eko Susilo
RE: Q on downloading spark for standalone cluster
Hello Sparkies ! Could anyone please answer this? This is not an Hadoop cluster, so which download option should I use to download for standalone cluster ? Also what are the best practices if you’ve 1TB of data and want to use spark ? Do you’ve to use Hadoop/CDH or some other option ? Appreciate it. From: Sagar, Sanjeev [mailto:sanjeev.sa...@mypointscorp.com] Sent: Thursday, August 28, 2014 2:44 PM To: Daniel Siegmann Cc: user@spark.apache.org Subject: RE: Q on downloading spark for standalone cluster Hello Daniel, If you’re not using Hadoop then why you want to grab the Hadoop package? CDH5 will download all the Hadoop packages and cloudera manager too. Just curious what happen if you start spark on EC2 cluster, what it choose for the data store as default? -Sanjeev From: Daniel Siegmann [mailto:daniel.siegm...@velos.io] Sent: Thursday, August 28, 2014 2:04 PM To: Sagar, Sanjeev Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Q on downloading spark for standalone cluster If you aren't using Hadoop, I don't think it matters which you download. I'd probably just grab the Hadoop 2 package. Out of curiosity, what are you using as your data store? I get the impression most Spark users are using HDFS or something built on top. On Thu, Aug 28, 2014 at 4:07 PM, Sanjeev Sagar sanjeev.sa...@mypointscorp.commailto:sanjeev.sa...@mypointscorp.com wrote: Hello there, I've a basic question on the downloadthat which option I need to downloadfor standalone cluster. I've a private cluster of three machineson Centos. When I click on download it shows me following: Download Spark The latest release is Spark 1.0.2, released August 5, 2014 (release notes) http://spark.apache.org/releases/spark-release-1-0-2.html (git tag) https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=8fb6f00e195fb258f3f70f04756e07c259a2351f Pre-built packages: * For Hadoop 1 (HDP1, CDH3): find an Apache mirror http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop1.tgz or direct file download http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz * For CDH4: find an Apache mirror http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-cdh4.tgz or direct file download http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz * For Hadoop 2 (HDP2, CDH5): find an Apache mirror http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz or direct file download http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop2.tgz Pre-built packages, third-party (NOTE: may include non ASF-compatible licenses): * For MapRv3: direct file download (external) http://package.mapr.com/tools/apache-spark/1.0.2/spark-1.0.2-bin-mapr3.tgz * For MapRv4: direct file download (external) http://package.mapr.com/tools/apache-spark/1.0.2/spark-1.0.2-bin-mapr4.tgz From the above it looks like that I've to donwload Hadoop or CDH4 first in order to use Spark ? I've a standalone cluster and my data size is also like hundreds of Gig or close to Terabyte. I don't get it that which one I need to download from the above list. Could some one assist me that which one I need to download for standalone cluster and for big data foot print ? or Hadoop is needed or mandatory for using Spark? that's not the understanding I've. My understanding is that you can use spark with Hadoop if you like from yarn2 but you could use spark standalone also without hadoop. Please assist. I'm confused ! -Sanjeev - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io W: www.velos.iohttp://www.velos.io
Announce: Smoke - a web frontend to Spark
Hi everyone! I've been working on Smoke, a web frontend to interactively launch Spark jobs without compiling it (only support Scala right now, and launching the jobs on yarn-client mode). It works executing the Scala script using spark-shell in the Spark server. It's developed in Python, uses Celery/Redis to stream the job logs to the web and is easy to install. The project is on GitHub: https://github.com/data-tsunami/smoke (install instructions, screenshots, etc.). It's in an early development stage, but very usable. Thanks! Horacio -- Horacio G. de Oro Data Tsunami Email: hgde...@gmail.com Web: http://www.data-tsunami.com/english/ Cel: +54 9 3572 525359 LinkedIn: https://www.linkedin.com/in/hgdeoro - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Hive max key length is 767 bytes
Spark SQL is based on Hive 12. They must have changed the maximum key size between 12 and 13. On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Can anyone please help? Regards Arthur On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: (Please ignore if duplicated) Hi, I use Spark 1.0.2 with Hive 0.13.1 I have already set the hive mysql database to latine1; mysql: alter database hive character set latin1; Spark: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala hiveContext.hql(create table test_datatype1 (testbigint bigint )) scala hiveContext.hql(drop table test_datatype1) 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
Re: Change delimiter when collecting SchemaRDD
Thanks Michael, that makes total sense. It works perfectly. Yadid On Thu, Aug 28, 2014 at 9:19 PM, Michael Armbrust mich...@databricks.com wrote: The comma is just the way the default toString works for Row objects. Since SchemaRDDs are also RDDs, you can do arbitrary transformations on the Row objects that are returned. For example, if you'd rather the delimiter was '|': sql(SELECT * FROM src).map(_.mkString(|)).collect() On Thu, Aug 28, 2014 at 7:58 AM, yadid ayzenberg ya...@media.mit.edu wrote: Hi All, Is there any way to change the delimiter from being a comma ? Some of the strings in my data contain commas as well, making it very difficult to parse the results. Yadid
Problem Accessing Hive Table from hiveContext
Hi All, New to spark and using Spark 1.0.2 and hive 0.12. If hive table created as test_datatypes(testbigint bigint, ss bigint ) select * from test_datatypes from spark works fine. For create table test_datatypes(testbigint bigint, testdec decimal(5,2) ) scala val dataTypes=hiveContext.hql(select * from test_datatypes) 14/08/28 21:18:44 INFO parse.ParseDriver: Parsing command: select * from test_datatypes 14/08/28 21:18:44 INFO parse.ParseDriver: Parse Completed 14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch MultiInstanceRelations 14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences java.lang.IllegalArgumentException: Error: ',', ':', or ';' expected at position 14 from 'bigint:decimal(5,2)' [0:bigint, 6::, 7:decimal, 14:(, 15:5, 16:,, 17:2, 18:)] at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:312) at org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:716) at org.apache.hadoop.hive.serde2.lazy.LazyUtils.extractColumnInfo(LazyUtils.java:364) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initSerdeParams(LazySimpleSerDe.java:288) at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:187) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:218) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:272) at org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:175) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:991) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:924) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:58) at org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:143) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:122) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:149) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:83) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:81) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156) Same exception happens using table as create table test_datatypes(testbigint bigint, testdate date ) . Thanks, Igor. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many open files
Hi, I am having the same problem reported by Michael. I am trying to open 30 files. ulimit -n shows the limit is 1024. So I am not sure why the program is failing with Too many open files error. The total size of all the 30 files is 230 GB. I am running the job on a cluster with 10 nodes, each having 16 GB. The error appears to be happening at the distinct() stage. Here is my program. In the following code, are all the 10 nodes trying to open all of the 30 files or are the files distributed among the 30 nodes? val baseFile = /mapr/mapr_dir/files_2013apr* valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) }.distinct().countByKey() val xrdd = sc.parallelize(x.toSeq) xrdd.saveAsTextFile(...) Instead of using the glob *, I guess I can try using a for loop to read the files one by one if that helps, but not sure if there is a more efficient solution. The following is the error transcript: Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent failure: Exception failure in TID 902 on host 192.168.13.11: java.io.FileNotFoundException: /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Possible to make one executor be able to work on multiple tasks simultaneously?
I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
Yes, executors run one task per core of your machine by default. You can also manually launch them with more worker threads than you have cores. What cluster manager are you on? Matei On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote: I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: DStream repartitioning, performance tuning processing
I wrote a long post about how I arrived here but in a nutshell I don't see evidence of re-partitioning and workload distribution across the cluster. My new fangled way of starting the job is: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class logStreamNormalizer \ --master yarn log-stream-normalizer_2.10-1.0.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 8G \ --executor-memory 30G \ --executor-cores 16 \ --num-executors 8 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 16 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/run-$run.pid Since the job spits out lots of logs, here is how I am trying to determine if any tasks got assigned to non-local executors. $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL Yields no lines. If I look at resource pool usage in YARN, this app is assigned 252.5GB of memory, 128 VCores and 9 containers. Am I missing something here? Thanks, Tim On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote: I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Having 16 partitions in KafkaUtils.createStream does not translate to the RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the best way to distribute the received data between all the nodes, as long as there are sufficient number of partitions (try setting it to 2x the number cores given to the application). Yeah, in 1.0.0, ttl should be unnecessary. On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote: On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das tathagata.das1...@gmail.com wrote: If you are repartitioning to 8 partitions, and your node happen to have at least 4 cores each, its possible that all 8 partitions are assigned to only 2 nodes. Try increasing the number of partitions. Also make sure you have executors (allocated by YARN) running on more than two nodes if you want to use all 11 nodes in your yarn cluster. If you look at the code, I commented out the manual re-partitioning to 8. Instead, I am created 16 partitions when I call createStream. But I will increase the partitions to, say, 64 and see if I get better parallelism. If you are using Spark 1.x, then you dont need to set the ttl for running Spark Streaming. In case you are using older version, why do you want to reduce it? You could reduce it, but it does increase the risk of the premature cleaning, if once in a while things get delayed by 20 seconds. I dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage shouldnt hurt performance). I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right, unless I have memory issues, more aggressive pruning won't help. Thanks, Tim TD On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote: Hi, In my streaming app, I receive from kafka where I have tried setting the partitions when calling createStream or later, by calling repartition - in both cases, the number of nodes running the tasks seems to be stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping to use more nodes. I am starting the job as: nohup spark-submit --class logStreamNormalizer --master yarn log-stream-normalizer_2.10-1.0.jar --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log echo $! run-6.pid My main code is: val sparkConf = new SparkConf().setAppName(SparkKafkaTest) val ssc = new
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
Standalone. I'd love to tell it that my one executor can simultaneously serve, say, 16 tasks at once for an arbitrary number of distinct jobs. On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Yes, executors run one task per core of your machine by default. You can also manually launch them with more worker threads than you have cores. What cluster manager are you on? Matei On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote: I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Spark SQL : how to find element where a field is in a given set
1.0.2 On Friday, August 29, 2014, Michael Armbrust mich...@databricks.com wrote: What version are you using? On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
Re: DStream repartitioning, performance tuning processing
Crash again. On the driver, logs say: 14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in removeExecutor org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 4 times, most recent failure: TID 6383 on host node-dn1-2-acme.com failed for unknown reason Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I go look at OS on node-dn1-2 and container logs for TID6383 but find nothing. # grep 6383 stderr 14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383 14/08/29 18:52:51 INFO Executor: Running task ID 6383 However, last message on the container is timestamped 19:04:51 that tells me the executor was killed for some reason right before the driver noticed that executor/task failure. How come my task failed only after 4 times although my config says failure threshold is 64? On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith secs...@gmail.com wrote: I wrote a long post about how I arrived here but in a nutshell I don't see evidence of re-partitioning and workload distribution across the cluster. My new fangled way of starting the job is: run=`date +%m-%d-%YT%T`; \ nohup spark-submit --class logStreamNormalizer \ --master yarn log-stream-normalizer_2.10-1.0.jar \ --jars spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar \ --driver-memory 8G \ --executor-memory 30G \ --executor-cores 16 \ --num-executors 8 \ --spark.serializer org.apache.spark.serializer.KryoSerializer \ --spark.rdd.compress true \ --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \ --spark.akka.threads 16 \ --spark.task.maxFailures 64 \ --spark.scheduler.mode FAIR \ logs/normRunLog-$run.log \ 2logs/normRunLogError-$run.log \ echo $! logs/run-$run.pid Since the job spits out lots of logs, here is how I am trying to determine if any tasks got assigned to non-local executors. $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log | grep Starting | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL Yields no lines. If I look at resource pool usage in YARN, this app is assigned 252.5GB of memory, 128 VCores and 9 containers. Am I missing something here? Thanks, Tim On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote: I set partitions to 64: // kInMsg.repartition(64) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) // Still see all activity only on the two nodes that seem to be receiving from Kafka. On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote: TD - Apologies, didn't realize I was replying to you instead of the list. What does numPartitions refer to when calling createStream? I read an earlier thread that seemed to suggest that numPartitions translates to partitions created on the Spark side? http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E Actually, I re-tried with 64 numPartitions in createStream and that didn't work. I will manually set repartition to 64/128 and see how that goes. Thanks. On Thu, Aug
Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow
Hi folks, I am trying to use Kafka with Spark Streaming, and it appears I cannot do the normal 'sbt package' as I do with other Spark applications, such as Spark alone or Spark with MLlib. I learned I have to build with the sbt-assembly plugin. OK, so here is my build.sbt file for my extremely simple test Kafka/Spark Streaming project. It Takes almost 30 minutes to build! This is a Centos Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To compare, sbt assembly for the entire Spark project itself takes less than 10 minutes. At the bottom of this file I am trying to play with 'cacheOutput' options, because I read online that maybe I am calculating SHA-1 for all the *.class files in this super JAR. I also copied the mergeStrategy from Spark contributor TD Spark Streaming tutorial from Spark Summit 2014. Again, is there some better way to build this JAR file, just using sbt package? This is process is working, but very slow. Any help with speeding up this compilation is really appreciated!! Aris - import AssemblyKeys._ // put this at the top of the file name := streamingKafka version := 1.0 scalaVersion := 2.10.4 libraryDependencies ++= Seq( org.apache.spark %% spark-core % 1.0.1 % provided, org.apache.spark %% spark-streaming % 1.0.1 % provided, org.apache.spark %% spark-streaming-kafka % 1.0.1 ) assemblySettings jarName in assembly := streamingkafka-assembly.jar mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf= MergeStrategy.concat case _ = MergeStrategy.first } assemblyOption in assembly ~= { _.copy(cacheOutput = false) }
Re: SparkSql is slow over yarn
Can you share more details about your job, cluster properties and configuration parameters? Thanks, Nishkam On Fri, Aug 29, 2014 at 11:33 AM, Chirag Aggarwal chirag.aggar...@guavus.com wrote: When I run SparkSql over yarn, it runs 2-4 times slower as compared to when its run in local mode. Please note that I have a four node yarn setup. Has anyone else also witnessed the same.
Re: Anyone know hot to submit spark job to yarn in java code?
Hi, I am facing the same problem. Did you find any solution or work around? Thanks and Regards, Archit Thakur. On Thu, Jan 16, 2014 at 6:22 AM, Liu, Raymond raymond@intel.com wrote: Hi Regarding your question 1) when I run the above script, which jar is beed submitted to the yarn server ? What SPARK_JAR env point to and the --jar point to are both submitted to the yarn server 2) It like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar plays the role of client side and spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and examples which will be running in yarn, am I right? The spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar will also go to yarn cluster as runtime for app jar(spark-examples-assembly-0.8.1-incubating.jar) 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff and want follow the same logic to submit spark job. For now I can only find the command line way to submit spark job to yarn. I believe there is a easy way to integration spark in a web allocation. You can use the yarn-client mode, you might want to take a look on docs/ running-on-yarn.md, and probably you might want to try master branch to check our latest update on this part of docs. And in yarn client mode, the sparkcontext itself will do similar thing as what the command line is doing to submit a yarn job Then to use it with java, you might want to try out JavaSparkContext instead of SparkContext, I don't personally run it with complicated applications. But a small example app did works. Best Regards, Raymond Liu -Original Message- From: John Zhao [mailto:jz...@alpinenow.com] Sent: Thursday, January 16, 2014 2:25 AM To: u...@spark.incubator.apache.org Subject: Anyone know hot to submit spark job to yarn in java code? Now I am working on a web application and I want to submit a spark job to hadoop yarn. I have already do my own assemble and can run it in command line by the following script: export YARN_CONF_DIR=/home/gpadmin/clusterConfDir/yarn export SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar ./spark-class org.apache.spark.deploy.yarn.Client --jar ./examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar --class org.apache.spark.examples.SparkPi --args yarn-standalone --num-workers 3 --master-memory 1g --worker-memory 512m --worker-cores 1 It works fine. The I realized that it is hard to submit the job from a web application .Looks like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar or spark-examples-assembly-0.8.1-incubating.jar is a really big jar. I believe it contains everything . So my question is : 1) when I run the above script, which jar is beed submitted to the yarn server ? 2) It loos like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar plays the role of client side and spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and examples which will be running in yarn, am I right? 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff and want follow the same logic to submit spark job. For now I can only find the command line way to submit spark job to yarn. I believe there is a easy way to integration spark in a web allocation. Thanks. John.
[PySpark] large # of partitions causes OOM
Here’s a repro for PySpark: a = sc.parallelize([Nick, John, Bob]) a = a.repartition(24000) a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1) When I try this on an EC2 cluster with 1.1.0-rc2 and Python 2.7, this is what I get: a = sc.parallelize([Nick, John, Bob]) a = a.repartition(24000) a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)14/08/29 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent heart beats: 175143ms exceeds 45000ms14/08/29 21:53:50 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(10, ip-10-138-18-106.ec2.internal, 33711, 0) with no recent heart beats: 175359ms exceeds 45000ms14/08/29 21:54:02 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(19, ip-10-139-36-207.ec2.internal, 52208, 0) with no recent heart beats: 173061ms exceeds 45000ms14/08/29 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent heart beats: 176816ms exceeds 45000ms14/08/29 21:54:22 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, ip-10-236-145-200.ec2.internal, 40959, 0) with no recent heart beats: 182241ms exceeds 45000ms14/08/29 21:54:40 WARN BlockManagerMasterActor: Removing BlockManager BlockManagerId(4, ip-10-139-1-195.ec2.internal, 49221, 0) with no recent heart beats: 178406ms exceeds 45000ms14/08/29 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver thread-3 java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514) at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311) at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014) java.nio.channels.ClosedChannelException at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295) at org.apache.spark.network.SendingConnection.read(Connection.scala:390) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162) at
Re: Low Level Kafka Consumer for Spark
Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [Spark Streaming] kafka consumer announce
TD, can you please comment on this code? I am really interested in including this code in Spark. But i am bothering about some point about persistence: 1. When we extend Receiver and call store, is it blocking call? Does it return only when spark stores rdd as requested (i.e. replicated or on disk)? Or is there some buffer allowing dataloss? In the latter case, can we have some callback telling to proceed with storing offsets. 2. I saw you implemented some rate limiting. Can you clarify how it works? In the face of network receiver getting data as fast as it can, and you liming this data in BM. what happens with exceeding data? Is it discarded? And if not, what happens? There is a lot of open questions how to make streaming reliable, and i have plenty of questions offlist. But i do not how to improve the code without spark support. On 21 Aug 2014, at 16:17, Evgeniy Shishkin itparan...@gmail.com wrote: Hello, we are glad to announce yet another kafka input stream. Available at https://github.com/wgnet/spark-kafka-streaming It is used in production for about 3 months. We will be happy to hear your feedback. Custom Spark Kafka consumer based on Kafka SimpleConsumer API. Features • discover kafka metadata from zookeeper (more reliable than from brokers, does not depend on broker list changes) • reding from multiple topics • reliably handles leader election and topic reassignment • saves offsets and stream metadata in hbase (more robust than zookeeper) • supports metrics via spark metrics mechanism (jmx, graphite, etc.) Todo • abstract offset storage • time controlled offsets commit • refactor kafka message to rdd elements transformation (flatmapper method) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL : how to find element where a field is in a given set
This feature was not part of that version. It will be in 1.1. On Fri, Aug 29, 2014 at 12:33 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: 1.0.2 On Friday, August 29, 2014, Michael Armbrust mich...@databricks.com wrote: What version are you using? On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Still not working for me. I got a compilation error : *value in is not a member of Symbol.* Any ideas ? On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com wrote: To pass a list to a variadic function you can use the type ascription :_* For example: val longList = Seq[Expression](a, b, ...) table(src).where('key in (longList: _*)) Also, note that I had to explicitly specify Expression as the type parameter of Seq to ensure that the compiler converts a and b into Spark SQL expressions. On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: ok, but what if I have a long list do I need to hard code like this every element of my list of is there a function that translate a list into a tuple ? On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com wrote: You don't need the Seq, as in is a variadic function. personTable.where('name in (foo, bar)) On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi all, What is the expression that I should use with spark sql DSL if I need to retreive data with a field in a given set. For example : I have the following schema case class Person(name: String, age: Int) And I need to do something like : personTable.where('name in Seq(foo, bar)) ? Cheers. Jaonary
What is the better data structure in an RDD
I need some advices regarding how data are stored in an RDD. I have millions of records, called Measures. They are bucketed with keys of String type. I wonder if I need to store them as RDD[(String, Measure)] or RDD[(String, Iterable[Measure])], and why? Data in each bucket are not related most of the time. The operations that I often needs to do are: - Sort the Measures in each bucket separately - Aggregate the Measures in each bucket separately - Combine Measures in two RDDs into one based on their bucket keys -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-better-data-structure-in-an-RDD-tp13159.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming reset state
You can use a tuple associating a timestamp to your running sum; and have COMPUTE_RUNNING_SUM to reset the running sum to zero when the timestamp is more than 5 minutes old. You'll still have a leak doing so if your keys keep changing, though. --Christophe 2014-08-29 9:00 GMT-07:00 Eko Susilo eko.harmawan.sus...@gmail.com: so the codes currently holding RDD containing codes and its respective counter. I would like to find a way to reset those RDD after some period of time. On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen so...@cloudera.com wrote: codes is a DStream, not an RDD. The remember() method controls how long Spark Streaming holds on to the RDDs itself. Clarify what you mean by reset? codes provides a stream of RDDs that contain your computation over a window of time. New RDDs come with the computation over new data. On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo eko.harmawan.sus...@gmail.com wrote: Hi all, I would like to ask some advice about resetting spark stateful operation. so i tried like this: JavaStreamingContext jssc = new JavaStreamingContext(context, new Duration(5000)); jssc.remember(Duration(5*60*1000)); jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES); JavaPairReceiverInputDStreamString, String messages = (JavaPairReceiverInputDStreamString, String) KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group, topicMap); JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH, SLIDE_INTERVAL); JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String, String, LogEntry() { @Override public LogEntry call(Tuple2String, String tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache(); JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE). reduceByKey(Functions.SUM_REDUCER). updateStateByKey(COMPUTE_RUNNING_SUM); i thought by setting the remember to 5 minutes, the codes RDD that derived from messages would also be reseted in 5 minutes, but in fact no. Is there any way to reset the codes RDD after a period of time (5 minutes)? Thanks -- Best Regards, Eko Susilo -- Best Regards, Eko Susilo
Re: Possible to make one executor be able to work on multiple tasks simultaneously?
Any more thoughts on this? I'm not sure how to do this yet. On Fri, Aug 29, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com wrote: Standalone. I'd love to tell it that my one executor can simultaneously serve, say, 16 tasks at once for an arbitrary number of distinct jobs. On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Yes, executors run one task per core of your machine by default. You can also manually launch them with more worker threads than you have cores. What cluster manager are you on? Matei On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote: I'm thinking of local mode where multiple virtual executors occupy the same vm. Can we have the same configuration in spark standalone cluster mode?
Re: Low Level Kafka Consumer for Spark
Ok, so I did this: val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 1)) } val kInMsg = ssc.union(kInStreams) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) This has improved parallelism. Earlier I would only get a Stream 0. Now I have Streams [0-9]. Of course, since the kafka topic has only three partitions, only three of those streams are active but I am seeing more blocks being pulled across the three streams total that what one was doing earlier. Also, four nodes are actively processing tasks (vs only two earlier) now which actually has me confused. If Streams are active only on 3 nodes then how/why did a 4th node get work? If a 4th got work why aren't more nodes getting work? On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote: I create my DStream very simply as: val kInMsg = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 8)) . . eventually, before I operate on the DStream, I repartition it: kInMsg.repartition(512) Are you saying that ^^ repartition doesn't split by dstream into multiple smaller streams? Should I manually create multiple Dstreams like this?: val kInputs = (1 to 10).map {_= KafkaUtils.createStream()} Then I apply some custom logic to it as: val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where normalizeLog takes a String and Map of regex and returns a string In my case, I think I have traced the issue to the receiver executor being killed by Yarn: 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on node-dn1-4-acme.com: remote Akka client disassociated This be the root cause? http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html https://issues.apache.org/jira/browse/SPARK-2121 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote: Are you using multiple Dstreams? repartitioning does not affect how many receivers you have. It's on 2 nodes for each receiver. You need multiple partitions in the queue, each consumed by a DStream, if you mean to parallelize consuming the queue. On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote: Good to see I am not the only one who cannot get incoming Dstreams to repartition. I tried repartition(512) but still no luck - the app stubbornly runs only on two nodes. Now this is 1.0.0 but looking at release notes for 1.0.1 and 1.0.2, I don't see anything that says this was an issue and has been fixed. How do I debug the repartition() statement to see what's the flow after the job hits that statement? On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote: Chris, I did the Dstream.repartition mentioned in the document on parallelism in receiving, as well as set spark.default.parallelism and it still uses only 2 nodes in my cluster. I notice there is another email thread on the same topic: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html My code is in Java and here is what I have: JavaPairReceiverInputDStreamString, String messages = KafkaUtils.createStream(ssc, zkQuorum, cse-job-play-consumer, kafkaTopicMap); JavaPairDStreamString, String newMessages = messages.repartition(partitionSize);// partitionSize=30 JavaDStreamString lines = newMessages.map(new FunctionTuple2lt;String, String, String() { ... public String call(Tuple2String, String tuple2) { return tuple2._2(); } }); JavaDStreamString words = lines.flatMap(new MetricsComputeFunction() ); JavaPairDStreamString, Integer wordCounts = words.mapToPair( new PairFunctionString, String, Integer() { ... } ); wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer, Void() {...}); Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [Streaming] Akka-based receiver with messages defined in uploaded jar
Just checked it with 1.0.2 Still same exception. From: Anton Brazhnyk [mailto:anton.brazh...@genesys.com] Sent: Wednesday, August 27, 2014 6:46 PM To: Tathagata Das Cc: user@spark.apache.org Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded jar Sorry for the delay with answer – was on vacation. As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script. The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object. From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Tuesday, August 05, 2014 11:32 PM To: Anton Brazhnyk Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)? TD On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Went through it once again to leave the only modification in question. Still same exception. I hope sources as zip file (instead of github) still can be tolerated. :) Here is the stacktrace generated with this sources: 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800 14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -Original Message- From: Tathagata Das [mailto:tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com] Sent: Tuesday, August 05, 2014 5:42 PM To: Anton Brazhnyk Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar Can you show us the modified version. The reason could very well be what you suggest, but I want to understand what conditions lead to this. TD On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote: Greetings, I modified ActorWordCount example a little and it uses simple case class as the
Re: [Streaming] Akka-based receiver with messages defined in uploaded jar
Can you try adding the JAR to the class path of the executors directly, by setting the config spark.executor.extraClassPath in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar. TD On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Just checked it with 1.0.2 Still same exception. *From:* Anton Brazhnyk [mailto:anton.brazh...@genesys.com] *Sent:* Wednesday, August 27, 2014 6:46 PM *To:* Tathagata Das *Cc:* user@spark.apache.org *Subject:* RE: [Streaming] Akka-based receiver with messages defined in uploaded jar Sorry for the delay with answer – was on vacation. As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script. The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object. *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com tathagata.das1...@gmail.com] *Sent:* Tuesday, August 05, 2014 11:32 PM *To:* Anton Brazhnyk *Cc:* user@spark.apache.org *Subject:* Re: [Streaming] Akka-based receiver with messages defined in uploaded jar How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)? TD On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk anton.brazh...@genesys.com wrote: Went through it once again to leave the only modification in question. Still same exception. I hope sources as zip file (instead of github) still can be tolerated. :) Here is the stacktrace generated with this sources: 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800 14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623) at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) at scala.util.Try$.apply(Try.scala:161) at akka.serialization.Serialization.deserialize(Serialization.scala:98) at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) -Original Message- From: Tathagata Das [mailto:tathagata.das1...@gmail.com] Sent: Tuesday, August 05, 2014
Re: Too many open files
Ops,the last reply didn't go to the user list. Mail app's fault. Shuffling happens in the cluster, so you need change all the nodes in the cluster. Sent from my iPhone On 2014年8月30日, at 3:10, Sudha Krishna skrishna...@gmail.com wrote: Hi, Thanks for your response. Do you know if I need to change this limit on all the cluster nodes or just the master? Thanks On Aug 29, 2014 11:43 AM, Ye Xianjin advance...@gmail.com wrote: 1024 for the number of file limit is most likely too small for Linux Machines on production. Try to set to 65536 or unlimited if you can. The too many open files error occurs because there are a lot of shuffle files(if wrong, please correct me): Sent from my iPhone On 2014年8月30日, at 2:06, SK skrishna...@gmail.com wrote: Hi, I am having the same problem reported by Michael. I am trying to open 30 files. ulimit -n shows the limit is 1024. So I am not sure why the program is failing with Too many open files error. The total size of all the 30 files is 230 GB. I am running the job on a cluster with 10 nodes, each having 16 GB. The error appears to be happening at the distinct() stage. Here is my program. In the following code, are all the 10 nodes trying to open all of the 30 files or are the files distributed among the 30 nodes? val baseFile = /mapr/mapr_dir/files_2013apr* valx = sc.textFile(baseFile)).map { line = val fields = line.split(\t) (fields(11), fields(6)) }.distinct().countByKey() val xrdd = sc.parallelize(x.toSeq) xrdd.saveAsTextFile(...) Instead of using the glob *, I guess I can try using a for loop to read the files one by one if that helps, but not sure if there is a more efficient solution. The following is the error transcript: Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent failure: Exception failure in TID 902 on host 192.168.13.11: java.io.FileNotFoundException: /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open files) java.io.FileOutputStream.open(Native Method) java.io.FileOutputStream.init(FileOutputStream.java:221) org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116) org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161) org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) scala.collection.Iterator$class.foreach(Iterator.scala:727) org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:744) Driver stacktrace: -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org