RE: problem connection to hdfs on localhost from spark-shell

2014-08-29 Thread linkpatrickliu
Change your conf/spark-env.sh:
export HADOOP_CONF_DIR=/etc/hadoop/confexport YARN_CONF_DIR=/etc/hadoop/conf
Date: Thu, 28 Aug 2014 16:19:05 -0700
From: ml-node+s1001560n13074...@n3.nabble.com
To: linkpatrick...@live.com
Subject: problem connection to hdfs on localhost from spark-shell






I have HDFS servers running locally and hadoop dfs -ls / are all running fine.
From spark-shell I do this:
val lines = sc.textFile(hdfs:///test)
and I get this error message.

java.io.IOException: Failed on local exception: java.io.EOFException; Host 
Details : local host is: localhost.localdomain/127.0.0.1; destination host 
is: localhost:9000;

I tried changing the contents of /etc/hosts to no avail. I also tried urls like 
hdfs://localhost:9000/test and many other variants. Nothing worked. Also I see 
a message (during spark-shell startup) that it has bound to a 192.168.x.x 
address. Any help is appreciated.

-- Bharath
  











If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/problem-connection-to-hdfs-on-localhost-from-spark-shell-tp13074.html


To start a new topic under Apache Spark User List, email 
ml-node+s1001560n1...@n3.nabble.com 

To unsubscribe from Apache Spark User List, click here.

NAML
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-connection-to-hdfs-on-localhost-from-spark-shell-tp13074p13101.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

u'' notation with pyspark output data

2014-08-29 Thread Oleg Ruchovets
Hi ,
  I am working with pyspark and doing simple aggregation


def doSplit(x):
y = x.split(',')
if(len(y)==3):
   return  y[0],(y[1],y[2])

counts = lines.map(doSplit).groupByKey()
output = counts.collect()

Iterating over output I got such format of the data u'1385501280' , u'14.0'
, but actually I need to work with 14 instead of u'14.0' and  1385501280
 u'1385501280'

Question:
   how to get actually data without u'' notation?


Thanks
Oleg.


Re: RE: The concurrent model of spark job/stage/task

2014-08-29 Thread 35597...@qq.com
hi, dear

  Thanks for the response. Some comments below. and yes, I am using spark on 
yarn.
1. The release doc of spark says multi jobs can be submitted in one application 
if the jobs(actions) are submit by different threads. I wrote some java thread 
code in driver, one action in each thread, and the stages are run concurrently 
which is observed on stages UI. 
In my understanding the DAGscheduler generates different graph for each action. 
Not sure correct or not.
Originally I was hoping the sparkcontext can generate different jobs for 
none-relevant actions, but never try it successfully.

2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently?

3.  I want to reterive the original data out of RDD and have other computation 
on the data. Like get the value of tempreture or other data, and works on them.



35597...@qq.com
 
From: linkpatrickliu
Date: 2014-08-29 14:01
To: user
Subject: RE: The concurrent model of spark job/stage/task
Hi, 

Please see the answers following each question. If there's any mistake, please 
let me know. Thanks!

I am not sure which mode you are running. So I will assume you are using 
spark-submit script to submit spark applications to spark 
cluster(spark-standalone or Yarn)

1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they 
run secquencly.
A spark application is a job, you init the application by create a 
SparkContext. The SparkContext will init the driver program for you.
So if you want to run multiple jobs simultaneously,  you have to split the jobs 
into different applications, and submit each of them.

The driver program is like an ApplicationMaster in yarn. It translate the spark 
application into a DAG graph, and schedule each stage to workers. Each stage 
consists of multiple Tasks.
The driver program handles the life cycle of a spark application. 

2. are the stages run currently? because they always number in order 0, 1. 2. 
3.. I obverserved on the spark stage UI.
No. Stages will run sequentially. It's a DAG graph, each stage depends on its 
parent.

3. Can I retrieve the data out of RDD? like populate a pojo myself and compute 
on it.
Not sure what you mean?
You can only retrieve a RDD related with your own SparkContext. 
But once a spark application is finished, the SparkContext is released. RDDs 
related with the SparkContext are released too.

Best regards,
Patrick Liu



Date: Thu, 28 Aug 2014 18:35:44 -0700
From: [hidden email]
To: [hidden email]
Subject: The concurrent model of spark job/stage/task

hi, guys

  I am trying to understand how spark work on the concurrent model. I read 
below from https://spark.apache.org/docs/1.0.2/job-scheduling.html 

quote
 Inside a given Spark application (SparkContext instance), multiple parallel 
jobs can run simultaneously if they were submitted from separate threads. By 
“job”, in this section, we mean a Spark action (e.g. save, collect) and any 
tasks that need to run to evaluate that action. Spark’s scheduler is fully 
thread-safe and supports this use case to enable applications that serve 
multiple requests (e.g. queries for multiple users).

I searched everywhere but not get:
1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they 
run secquencly.
2. are the stages run currently? because they always number in order 0, 1. 2. 
3.. I obverserved on the spark stage UI.
3. Can I retrieve the data out of RDD? like populate a pojo myself and compute 
on it.

Thanks in advance, guys.



[hidden email]




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-user-list.1001560.n3.nabble.com/The-concurrent-model-of-spark-job-stage-task-tp13083.html
 
To start a new topic under Apache Spark User List, email [hidden email] 
To unsubscribe from Apache Spark User List, click here.
NAML 



View this message in context: RE: The concurrent model of spark job/stage/task
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to get prerelease thriftserver working?

2014-08-29 Thread Matt Chu
Hey Michael, Cheng,

Thanks for the replies. Sadly I can't remember the specific error so I'm
going to chalk it up to user error, especially since others on the list
have not had a problem.

@michael By the way, was at the Spark 1.1 meetup yesterday. Great event,
very informative, cheers and keep doing more!

@cheng Got it, cheers. Fortunately we don't have to deal with this use
case, but that's good to know (especially the $SPARK_HOME bit).




On Wed, Aug 27, 2014 at 3:36 PM, Cheng Lian lian.cs@gmail.com wrote:

 Hey Matt, if you want to access existing Hive data, you still need a to
 run a Hive metastore service, and provide a proper hive-site.xml (just
 drop it in $SPARK_HOME/conf).

 Could you provide the error log you saw?
 ​


 On Wed, Aug 27, 2014 at 12:09 PM, Michael Armbrust mich...@databricks.com
  wrote:

 I would expect that to work.  What exactly is the error?


 On Wed, Aug 27, 2014 at 6:02 AM, Matt Chu m...@kabam.com wrote:

 (apologies for sending this twice, first via nabble; didn't realize it
 wouldn't get forwarded)

 Hey, I know it's not officially released yet, but I'm trying to
 understand (and run) the Thrift-based JDBC server, in order to enable
 remote JDBC access to our dev cluster.

 Before asking about details, is my understanding of this correct?
 `sbin/start-thriftserver` is a JDBC/Hive server that doesn't require
 running a Hive+MR cluster (i.e. just Spark/Spark+YARN)?

 Assuming yes, I have hope that it all basically works, just that some
 documentation needs to be cleaned up:

 - I found a release page implying that 1.1 will be released pretty
 soon-ish:
 https://cwiki.apache.org/confluence/display/SPARK/Wiki+Homepage
 - I can find recent (more recent 30 days or so) activity with promising
 titles: [Updated Spark SQL README to include the hive-thriftserver
 module](https://github.com/apache/spark/pull/1867),
 [[SPARK-2410][SQL] Merging Hive Thrift/JDBC server (with Maven profile
 fix)](https://github.com/apache/spark/pull/1620)

 Am I following all the right email threads, issues trackers, and
 whatnot?

 Specifically, I tried:

 1. Building off of `branch-1.1`, synced as of ~today (2014 Aug 25)
 2. Running `sbin/start-thriftserver.sh` in `yarn-client` mode
 3. Can see the processing running, and the spark context/app created in
 yarn logs,
 and can connect to the thrift server on the default port of 1 using
 `bin/beeline`
 4. However, when I try to find out what that cluster has via `show
 tables;`, in the logs
 I see a connection error to some (what I assume to be) random port.

 So what service am I forgetting/too ignorant to run? Or did I
 misunderstand and we do need a live Hive instance to back thriftserver? Or
 is this a YARN-specific issue?

 Only recently started learning the ecosystem and community, so apologies
 for the longer post and lots of questions. :)

 Matt






Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Jaonary Rabarisoa
ok, but what if I have a long list do I need to hard code like this every
element of my list of is there a function that translate a list into a
tuple ?


On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com
wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I need to
 retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary





Re: Failed to run runJob at ReceiverTracker.scala

2014-08-29 Thread Tim Smith
I upped the ulimit to 128k files on all nodes. Job crashed again with
DAGScheduler: Failed to run runJob at ReceiverTracker.scala:275.
Couldn't get the logs because I killed the job and looks like yarn
wipe the container logs (not sure why it wipes the logs under
/var/log/hadoop-yarn/container). Next time, I will grab the logs while
the job is still active/zombie.

So is there a limit on how many times a receiver is re-spawned?

Thanks,

Tim


On Thu, Aug 28, 2014 at 10:06 PM, Tathagata Das
tathagata.das1...@gmail.com wrote:
 It did. It got failed and respawned 4 times.
 In this case, the too many open files is a sign that you need increase the
 system-wide limit of open files.
 Try adding ulimit -n 16000 to your conf/spark-env.sh.

 TD


 On Thu, Aug 28, 2014 at 5:29 PM, Tim Smith secs...@gmail.com wrote:

 Appeared after running for a while. I re-ran the job and this time, it
 crashed with:
 14/08/29 00:18:50 WARN ReceiverTracker: Error reported by receiver for
 stream 0: Error in block pushing thread - java.net.SocketException: Too many
 open files

 Shouldn't the failed receiver get re-spawned on a different worker?



 On Thu, Aug 28, 2014 at 4:12 PM, Tathagata Das
 tathagata.das1...@gmail.com wrote:

 Do you see this error right in the beginning or after running for
 sometime?

 The root cause seems to be that somehow your Spark executors got killed,
 which killed receivers and caused further errors. Please try to take a look
 at the executor logs of the lost executor to find what is the root cause
 that caused the executor to fail.

 TD


 On Thu, Aug 28, 2014 at 3:54 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 Have a Spark-1.0.0 (CDH5) streaming job reading from kafka that died
 with:

 14/08/28 22:28:15 INFO DAGScheduler: Failed to run runJob at
 ReceiverTracker.scala:275
 Exception in thread Thread-59 14/08/28 22:28:15 INFO
 YarnClientClusterScheduler: Cancelling stage 2
 14/08/28 22:28:15 INFO DAGScheduler: Executor lost: 5 (epoch 4)
 14/08/28 22:28:15 INFO BlockManagerMasterActor: Trying to remove
 executor 5 from BlockManagerMaster.
 14/08/28 22:28:15 INFO BlockManagerMaster: Removed 5 successfully in
 removeExecutor
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 2.0:0 failed 4 times, most recent failure: TID 6481 on host
 node-dn1-1.ops.sfdc.net failed for unknown reason
 Driver stacktrace:
 at
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


 Any insights into this error?

 Thanks,

 Tim





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I set partitions to 64:

//
 kInMsg.repartition(64)
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
//

Still see all activity only on the two nodes that seem to be receiving
from Kafka.

On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
 TD - Apologies, didn't realize I was replying to you instead of the list.

 What does numPartitions refer to when calling createStream? I read an
 earlier thread that seemed to suggest that numPartitions translates to
 partitions created on the Spark side?
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E

 Actually, I re-tried with 64 numPartitions in createStream and that didn't
 work. I will manually set repartition to 64/128 and see how that goes.

 Thanks.




 On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Having 16 partitions in KafkaUtils.createStream does not translate to the
 RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
 best way to distribute the received data between all the nodes, as long as
 there are sufficient number of partitions (try setting it to 2x the number
 cores given to the application).

 Yeah, in 1.0.0, ttl should be unnecessary.



 On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote:

 On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
 tathagata.das1...@gmail.com wrote:

 If you are repartitioning to 8 partitions, and your node happen to have
 at least 4 cores each, its possible that all 8 partitions are assigned to
 only 2 nodes. Try increasing the number of partitions. Also make sure you
 have executors (allocated by YARN) running on more than two nodes if you
 want to use all 11 nodes in your yarn cluster.


 If you look at the code, I commented out the manual re-partitioning to 8.
 Instead, I am created 16 partitions when I call createStream. But I will
 increase the partitions to, say, 64 and see if I get better parallelism.



 If you are using Spark 1.x, then you dont need to set the ttl for
 running Spark Streaming. In case you are using older version, why do you
 want to reduce it? You could reduce it, but it does increase the risk of 
 the
 premature cleaning, if once in a while things get delayed by 20 seconds. I
 dont see much harm in keeping the ttl at 60 seconds (a bit of extra garbage
 shouldnt hurt performance).


 I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are right,
 unless I have memory issues, more aggressive pruning won't help.

 Thanks,

 Tim




 TD


 On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 In my streaming app, I receive from kafka where I have tried setting
 the partitions when calling createStream or later, by calling 
 repartition
 - in both cases, the number of nodes running the tasks seems to be
 stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was hoping 
 to
 use more nodes.

 I am starting the job as:
 nohup spark-submit --class logStreamNormalizer --master yarn
 log-stream-normalizer_2.10-1.0.jar --jars
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
 --num-executors 8 normRunLog-6.log 2normRunLogError-6.log  echo $! 
 run-6.pid

 My main code is:
  val sparkConf = new SparkConf().setAppName(SparkKafkaTest)
  val ssc = new StreamingContext(sparkConf,Seconds(5))
  val kInMsg =
 KafkaUtils.createStream(ssc,node-nn1-1:2181/zk_kafka,normApp,Map(rawunstruct
 - 16))

  val propsMap = Map(metadata.broker.list -
 node-dn1-6:9092,node-dn1-7:9092,node-dn1-8:9092, serializer.class -
 kafka.serializer.StringEncoder, producer.type - async,
 request.required.acks - 1)
  val to_topic = normStruct
  val writer = new KafkaOutputService(to_topic, propsMap)


  if (!configMap.keySet.isEmpty)
  {
   //kInMsg.repartition(8)
   val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
   outdata.foreachRDD((rdd,time) = { rdd.foreach(rec = {
 writer.output(rec) }) } )
  }

  ssc.start()
  ssc.awaitTermination()

 In terms of total delay, with a 5 second batch, the delays usually stay
 under 5 seconds, but sometimes jump to ~10 seconds. As a performance 
 tuning
 question, does this mean, I can reduce my cleaner ttl from 60 to say 25
 (still more than double of the peak delay)?

 Thanks

 Tim






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark / Thrift / ODBC connectivity

2014-08-29 Thread Cheng Lian
You can use the Thrift server to access Hive tables that locates in legacy
Hive warehouse and/or those generated by Spark SQL. Simba provides Spark
SQL ODBC driver that enables applications like Tableau. But right now I'm
not 100% sure about whether the driver has officially released yet.


On Thu, Aug 28, 2014 at 9:42 PM, Denny Lee denny.g@gmail.com wrote:

 I’m currently using the Spark 1.1 branch and have been able to get the
 Thrift service up and running.  The quick questions were whether I should
 able to use the Thrift service to connect to SparkSQL generated tables
 and/or Hive tables?

 As well, by any chance do we have any documents that point to how we can
 connect something like Tableau to Spark SQL Thrift - similar to the SAP
 ODBC connectivity http://www.saphana.com/docs/DOC-472?

 Thanks!
 Denny




how can I get the number of cores

2014-08-29 Thread Kevin Jung
Hi all
Spark web ui gives me the information about total cores and used cores.
I want to get this information programmatically.
How can I do this?

Thanks
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
To pass a list to a variadic function you can use the type ascription :_*

For example:

val longList = Seq[Expression](a, b, ...)
table(src).where('key in (longList: _*))

Also, note that I had to explicitly specify Expression as the type
parameter of Seq to ensure that the compiler converts a and b into
Spark SQL expressions.




On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 ok, but what if I have a long list do I need to hard code like this every
 element of my list of is there a function that translate a list into a
 tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com
 wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I need to
 retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary






Re: u'' notation with pyspark output data

2014-08-29 Thread Davies Liu
u'14.0' means a unicode string, you can convert into str by
u'14.0'.encode('utf8'), or you can convert it into float by
float(u'14.0')

Davies

On Thu, Aug 28, 2014 at 11:22 PM, Oleg Ruchovets oruchov...@gmail.com wrote:
 Hi ,
   I am working with pyspark and doing simple aggregation


 def doSplit(x):
 y = x.split(',')
 if(len(y)==3):
return  y[0],(y[1],y[2])

 counts = lines.map(doSplit).groupByKey()
 output = counts.collect()

 Iterating over output I got such format of the data u'1385501280' , u'14.0'
 , but actually I need to work with 14 instead of u'14.0' and  1385501280
 u'1385501280'

 Question:
how to get actually data without u'' notation?


 Thanks
 Oleg.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Ensuring object in spark streaming runs on specific node

2014-08-29 Thread Filip Andrei
Say you have a spark streaming setup such as

JavaReceiverInputDStream... rndLists = jssc.receiverStream(new
JavaRandomReceiver(...));

rndLists.map(new NeuralNetMapper(...))
.foreach(new JavaSyncBarrier(...));

Is there any way of ensuring that, say, a JavaRandomReceiver and
JavaSyncBarrier get distributed to the same node ? Or is this even a
question that makes sense ?

Some information as to how spark-streaming distributes work across a cluster
would also be greatly appreciated.

( i've also asked this question on stackoverflow at
http://stackoverflow.com/questions/25564356/ensuring-object-in-spark-streaming-runs-on-specific-node
)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Ensuring-object-in-spark-streaming-runs-on-specific-node-tp13114.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
Hi,

My requirement is to run Spark on Yarn without using the script
spark-submit.

I have a servlet and a tomcat server. As and when request comes, it creates
a new SC and keeps it alive for the further requests, I ma setting my
master in sparkConf

as sparkConf.setMaster(yarn-cluster)

but the request is stuck indefinitely.

This works when I set
sparkConf.setMaster(yarn-client)

I am not sure, why is it not launching job in yarn-cluster mode.

Any thoughts?

Thanks and Regards,
Archit Thakur.


Re: how to specify columns in groupby

2014-08-29 Thread MEETHU MATHEW
Thank you Yanbo for the reply..

I 've another query related to  cogroup.I want to iterate over the results of 
cogroup operation.

My code is 
* grp = RDD1.cogroup(RDD2)
* map((lambda (x,y): (x,list(y[0]),list(y[1]))), list(grp))
My result looks like :

[((u'764', u'20140826'), [0.70146274566650391], [ ]),
 ((u'863', u'20140826'), [0.368011474609375], [ ]),
 ((u'9571520', u'20140826'), [0.0046129226684570312], [0.60009])]
 
When I do one more cogroup operation like 

grp1 = grp.cogroup(RDD3)

I am not able to see the results.All my RDDs are of the form ((x,y),z).Can 
somebody help me to solve this.

Thanks  Regards, 
Meethu M


On Thursday, 28 August 2014 5:59 PM, Yanbo Liang yanboha...@gmail.com wrote:
 


For your reference:

val d1 = textFile.map(line = {
  val fileds = line.split(,)
  ((fileds(0),fileds(1)), fileds(2).toDouble)
})

val d2 = d1.reduceByKey(_+_)
d2.foreach(println)




2014-08-28 20:04 GMT+08:00 MEETHU MATHEW meethu2...@yahoo.co.in:

Hi all,


I have an RDD  which has values in the  format id,date,cost.


I want to group the elements based on the id and date columns and get the sum 
of the cost  for each group.


Can somebody tell me how to do this?


 
Thanks  Regards, 
Meethu M

Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Jaonary Rabarisoa
Still not working for me. I got a compilation error : *value in is not a
member of Symbol.* Any ideas ?


On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com
wrote:

 To pass a list to a variadic function you can use the type ascription :_*

 For example:

 val longList = Seq[Expression](a, b, ...)
 table(src).where('key in (longList: _*))

 Also, note that I had to explicitly specify Expression as the type
 parameter of Seq to ensure that the compiler converts a and b into
 Spark SQL expressions.




 On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 ok, but what if I have a long list do I need to hard code like this every
 element of my list of is there a function that translate a list into a
 tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust mich...@databricks.com
  wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I need
 to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary







Re: Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
including user@spark.apache.org.


On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur archit279tha...@gmail.com
wrote:

 Hi,

 My requirement is to run Spark on Yarn without using the script
 spark-submit.

 I have a servlet and a tomcat server. As and when request comes, it
 creates a new SC and keeps it alive for the further requests, I ma setting
 my master in sparkConf

 as sparkConf.setMaster(yarn-cluster)

 but the request is stuck indefinitely.

 This works when I set
 sparkConf.setMaster(yarn-client)

 I am not sure, why is it not launching job in yarn-cluster mode.

 Any thoughts?

 Thanks and Regards,
 Archit Thakur.






Re: How to debug this error?

2014-08-29 Thread Yanbo Liang
It's not allowed to use RDD in map function.
RDD can only operated at driver of spark program.
At your case, group RDD can't be found at every executor.

I guess you want to implement subquery like operation, try to use
RDD.intersection() or join()


2014-08-29 12:43 GMT+08:00 Gary Zhao garyz...@gmail.com:

 Hello

 I'm new to Spark and playing around, but saw the following error. Could
 anyone to help on it?

 Thanks
 Gary



 scala c
 res15: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[7] at flatMap at
 console:23

 scala group
 res16: org.apache.spark.rdd.RDD[(String, Iterable[String])] =
 MappedValuesRDD[5] at groupByKey at console:19

 val d = c.map(i=group.filter(_._1 ==i ))

 d.first

 14/08/29 04:39:33 INFO TaskSchedulerImpl: Cancelling stage 28
 14/08/29 04:39:33 INFO DAGScheduler: Failed to run first at console:28
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 28.0:180 failed 4 times, most recent failure: Exception failure in TID 3605
 on host mcs-spark-slave1-staging.snc1: java.lang.NullPointerException
 org.apache.spark.rdd.RDD.filter(RDD.scala:282)
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:25)
 $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:25)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
 scala.collection.Iterator$class.foreach(Iterator.scala:727)
 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

 scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)

 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
 scala.collection.TraversableOnce$class.to
 (TraversableOnce.scala:273)
 scala.collection.AbstractIterator.to(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
 scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

 scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
 scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
 org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)
 org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1003)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)

 org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1083)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
 org.apache.spark.scheduler.Task.run(Task.scala:51)

 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:744)
 Driver stacktrace:
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
  at
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
  at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
 at scala.Option.foreach(Option.scala:236)
  at
 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
 at
 org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



RE: The concurrent model of spark job/stage/task

2014-08-29 Thread linkpatrickliu
Hi, 
I think an example will help illustrate the model better.
/*** SimpleApp.scala ***/import org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._
object SimpleApp {  def main(args: Array[String]) {val logFile = 
$YOUR_SPARK_HOME/README.md val sc = new SparkContext(local, Simple 
App, YOUR_SPARK_HOME, List(target/scala-2.10/simple-project_2.10-1.0.jar)) 
   val logData = sc.textFile(logFile, 2).cache()val numAs = 
logData.filter(line = line.contains(a)).filter(line = 
line.contains(c)).count()val numBs = logData.filter(line = 
line.contains(b)).count()println(Lines with a: %s, Lines with b: 
%s.format(numAs, numBs))  }}
The example's DAG graph is corresponding to your graph:Let's see how it 
works:1. val sc = new SparkContext   // This line create the 
SparkContext(which is the driver)
2. val numAs = logData.filter(line = line.contains(a)).filter(line = 
line.contains(c)).count()This is a job with 2 transformation and 1 action.
3. val numBs = logData.filter(line = line.contains(b)).count()This is 
another job with 1 transformation and 1 action.
Remember the Scala's LAZY calculation strategy.
The job numAs will be calculated by invoking the count() method.It has 3 
stages. FilteredRDD(1) - FilteredRDD(2) - RDD.count()
(1) RDD.count() will submit it as the Final Stage to DAGScheduler. (2) 
DAGScheduler analyse the dependency chain, and asks RDD's parent FilteredRDD(2) 
to be computed first, and FilteredRDD(2) will ask its parent FilteredRDD(1) to 
computed first. FilteredRDD(1) is the first, so it will be computed.(3) Then 
DAGScheduler wrap the FilteredRDD(1) as a TaskSet, and submit the TaskSet to 
TaskSchedulerImple.(4) Then TaskSchedulerImple will schedule the TaskSet by 
FIFO or FAIR strategy.(5) The tasks in TaskSet will be distributed to 
different Executor. (6) After all the tasks of this TaskSet have finished. This 
Stage is marked finished. (RDD will be cached by BlockStore, RDD data can be 
shared in this SparkContext. If you have a job numCs,val numCs= 
logData.filter(line = line.contains(a)).filter(line = 
line.contains(d)).count() the first filter(line = line.contains(a)) can 
reuse the RDD data computed in numAs.)
(7) Then the FilteredRDD(2) will be computed. Then the RDD.count().(8) Finally 
you have the result for numAs.

I think you now understand the submitschedulerun process.Let's see the 
questions:
1. Each DAGgraph is related with 1 action. You can write multiple actions in a 
spark application. If you want these actions to run simultaneously, you have to 
to submit these actions in different threads.2. I think you should pay 
attention to FIFO or FAIR scheduler strategy. If the first action is too 
large, maybe the second action will be starved.3. I think the question is how 
to persist the RDD data to local disk?You could use saveAsTextFile(path) or 
saveAsSequenceFile(path) to persist RDD data to local dist.
Hope this will help you.
Best regards,Patrick Liu

Date: Thu, 28 Aug 2014 23:34:29 -0700
From: ml-node+s1001560n13104...@n3.nabble.com
To: linkpatrick...@live.com
Subject: Re: RE: The concurrent model of spark job/stage/task




hi, dear
  Thanks for the response. Some comments below. and yes, I am using spark on 
yarn.1. The release doc of spark says multi jobs can be submitted in one 
application if the jobs(actions) are submit by different threads. I wrote some 
java thread code in driver, one action in each thread, and the stages are run 
concurrently which is observed on stages UI. In my understanding the 
DAGscheduler generates different graph for each action. Not sure correct or 
not.Originally I was hoping the sparkcontext can generate different jobs for 
none-relevant actions, but never try it successfully.

2. If DAGscheduler generates graph as below, can 1 and 2 run concurrently?
3.  I want to reterive the original data out of RDD and have other computation 
on the data. Like get the value of tempreture or other data, and works on them.


[hidden email]
 From: [hidden email]Date: 2014-08-29 14:01To: [hidden email]Subject: RE: The 
concurrent model of spark job/stage/task
Hi, 
Please see the answers following each question. If there's any mistake, please 
let me know. Thanks!
I am not sure which mode you are running. So I will assume you are using 
spark-submit script to submit spark applications to spark 
cluster(spark-standalone or Yarn)
1. how to start 2 or more jobs in one spark driver, in java code.. I wrote 2 
actions in the code, but the job still staged in index 0, 1, 2, 3... looks they 
run secquencly.A spark application is a job, you init the application by create 
a SparkContext. The SparkContext will init the driver program for you.So if you 
want to run multiple jobs simultaneously,  you have to split the jobs into 
different applications, and submit each of them.
The driver program is like an ApplicationMaster in yarn. It translate the spark 
application into a DAG graph, and schedule each stage to workers. 

Re: Spark Hive max key length is 767 bytes

2014-08-29 Thread arthur.hk.c...@gmail.com
Hi,


Tried the same thing in HIVE directly without issue:

HIVE:
hive create table test_datatype2 (testbigint bigint );
OK
Time taken: 0.708 seconds

hive drop table test_datatype2;
OK
Time taken: 23.272 seconds



Then tried again in SPARK:
scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/08/29 19:33:52 INFO Configuration.deprecation: 
mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@395c7b94

scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
res0: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:104
== Query Plan ==
Native command: executed by Hive

scala hiveContext.hql(drop table test_datatype3)

14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no 
possible candidates
Error(s) were found while auto-creating/validating the datastore for classes. 
The errors are printed in the log, and are attached to this exception.
org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while 
auto-creating/validating the datastore for classes. The errors are printed in 
the log, and are attached to this exception.
at 
org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified 
key was too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)


Can anyone please help?

Regards
Arthur


On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 (Please ignore if duplicated) 
 
 
 Hi,
 
 I use Spark 1.0.2 with Hive 0.13.1
 
 I have already set the hive mysql database to latine1; 
 
 mysql:
 alter database hive character set latin1;
 
 Spark:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 scala hiveContext.hql(create table test_datatype1 (testbigint bigint ))
 scala hiveContext.hql(drop table test_datatype1)
 
 
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
 embedded-only so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only 
 so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
 embedded-only so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only 
 so does not have its own datastore table.
 14/08/29 12:31:59 ERROR DataNucleus.Datastore: An exception was thrown while 
 

Re: how to filter value in spark

2014-08-29 Thread marylucy
i see it works well,thank you!!!

But in follow situation how to do

var a = sc.textFile(/sparktest/1/).map((_,a))
var b = sc.textFile(/sparktest/2/).map((_,b))
How to get (3,a) and (4,a)


在 Aug 28, 2014,19:54,Matthew Farrellee m...@redhat.com 写道:

 On 08/28/2014 07:20 AM, marylucy wrote:
 fileA=1 2 3 4  one number a line,save in /sparktest/1/
 fileB=3 4 5 6  one number a line,save in /sparktest/2/
 I want to get 3 and 4
 
 var a = sc.textFile(/sparktest/1/).map((_,1))
 var b = sc.textFile(/sparktest/2/).map((_,1))
 
 a.filter(param={b.lookup(param._1).length0}).map(_._1).foreach(println)
 
 Error throw
 Scala.MatchError:Null
 PairRDDFunctions.lookup...
 
 the issue is nesting of the b rdd inside a transformation of the a rdd
 
 consider using intersection, it's more idiomatic
 
 a.intersection(b).foreach(println)
 
 but not that intersection will remove duplicates
 
 best,
 
 
 matt
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


Re: Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Chester @work
Archit
 We are using yarn-cluster mode , and calling spark via Client class 
directly from servlet server. It works fine. 
To establish a communication channel to give further requests, 
 It should be possible with yarn client, but not with yarn server. Yarn 
client mode, spark driver is outside the yarn cluster; so it can issue more 
commands. In yarn cluster, all programs including spark driver is running 
inside the yarn cluster. There is no communication channel with the client 
until the job finishes.

If you job is to keep spark context alive, and wait for other commands, then 
this should wait forever. 

I am actually working on some improvements on this and experiment in our 
product, I will create PRs when I feel conformable with the solution

1) change Client API to allow the caller to know yarn app resource capacity 
before passing arguments
2) add YarnApplicationListener to the Client 
3) provide communication channel between application and spark Yarn client in 
cluster. 

The #1) is not directly related to the communication discussed here

#2) allows the application to have application life cycle call back as to app 
start end in progress failure etc with yarn resources allocations 

I changed #1 and #2 in forked spark, and it's worked well in cdh5, and I am 
testing against 2.0.5-alpha as well. 

For #3) I did not change in spark currently, as I am not sure the best approach 
yet. I put the change in the application runner which launch the spark yarn 
client in the cluster. 

The runner in yarn cluster get applications host and port information  from the 
passed configuration (args), then creates an Akka actor using spark context 
actor system, send a hand shake message to the caller outside the cluster, 
after that you will have a two way communications 

With this approach, I can send spark listener call backs to the app, error 
messages, app level messages etc. 

The runner inside the cluster can also receive requests from outside cluster 
such as stop. 

We are not sure Akka approach is the best, so I am still experimenting it. So 
far it does what we wants .

Hope this helps

Chester


Sent from my iPhone

 On Aug 29, 2014, at 2:36 AM, Archit Thakur archit279tha...@gmail.com wrote:
 
 including user@spark.apache.org.
 
 
 On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur archit279tha...@gmail.com 
 wrote:
 Hi,
 
 My requirement is to run Spark on Yarn without using the script spark-submit.
 
 I have a servlet and a tomcat server. As and when request comes, it creates 
 a new SC and keeps it alive for the further requests, I ma setting my master 
 in sparkConf
 
 as sparkConf.setMaster(yarn-cluster)
 
 but the request is stuck indefinitely. 
 
 This works when I set
 sparkConf.setMaster(yarn-client)
 
 I am not sure, why is it not launching job in yarn-cluster mode.
 
 Any thoughts?
 
 Thanks and Regards,
 Archit Thakur. 
 


Re: Where to save intermediate results?

2014-08-29 Thread huylv
Hi Daniel,

Your suggestion is definitely an interesting approach. In fact, I already
have another system to deal with the stream analytical processing part. So
basically, the Spark job to aggregate data just accumulatively computes
aggregations from historical data together with new batch, which has been
partly summarized by the stream processor. Answering queries involves in
combining pre-calculated historical data together with on-stream
aggregations. This sounds much like what Spark Streaming is intended to do.
So I'll take a look deeper into Spark Streaming to consider porting the
stream processing part to use Spark Streaming.

Regarding saving pre-calculated data onto external storages (disk,
database...), I'm looking at Cassandra for now. But I don't know how it fits
into my context and how is its performance compared to saving to files in
HDFS. Also, is there anyway to keep the precalculated data both on disk and
on memory, so that when the batch job terminated, historical data still
available on memory for combining with stream processor, while still be able
to survive system failure or upgrade? Not to mention the size of
precalculated data might get too big, in that case, partly store newest data
on memory only would be better. Tachyon looks like a nice option but again,
I don't have experience with it and it's still an experimental feature of
Spark.

Regards,
Huy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Where-to-save-intermediate-results-tp13062p13127.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how can I get the number of cores

2014-08-29 Thread Nicholas Chammas
What version of Spark are you running?

Try calling sc.defaultParallelism. I’ve found that it is typically set to
the number of worker cores in your cluster.
​


On Fri, Aug 29, 2014 at 3:39 AM, Kevin Jung itsjb.j...@samsung.com wrote:

 Hi all
 Spark web ui gives me the information about total cores and used cores.
 I want to get this information programmatically.
 How can I do this?

 Thanks
 Kevin



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/how-can-I-get-the-number-of-cores-tp13111.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-29 Thread Yana Kadiyska
I understand that the DB writes are happening from the workers unless you
collect. My confusion is that you believe workers recompute on recovery(nodes
computations which get redone upon recovery). My understanding is that
checkpointing dumps  the RDD to disk and the cuts the RDD lineage. So I
thought on driver restart you'll get a set of new executor processes but
they would read the last known state of the RDD from HDFS checkpoint. Am I
off here?

So the only situation I can imagine where you end up recomputing is if your
checkpointing at a larger interval than your batch size (i.e. the RDD on
disk does not reflect it's last precrash state)?


On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Yana,

 The fact is that the DB writing is happening on the node level and not on
 Spark level. One of the benefits of distributed computing nature of Spark
 is
 enabling IO distribution as well. For example, is much faster to have the
 nodes to write to Cassandra instead of having them all collected at the
 driver level and sending the writes from there.

 The problem is that nodes computations which get redone upon recovery. If
 these lambda functions send events to other systems these events would get
 resent upon re-computation causing overall system instability.

 Hope this helps you understand the problematic.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark webUI - application details page

2014-08-29 Thread Brad Miller
How did you specify the HDFS path?  When i put

spark.eventLog.dir   hdfs://
crosby.research.intel-research.net:54310/tmp/spark-events

in my spark-defaults.conf file, I receive the following error:

An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.io.IOException: Call to
crosby.research.intel-research.net/10.212.84.53:54310 failed on local
exception: java.io.EOFException

-Brad


On Thu, Aug 28, 2014 at 12:26 PM, SK skrishna...@gmail.com wrote:

 I was able to recently solve this problem for standalone mode. For this
 mode,
 I did not use a history server. Instead, I set spark.eventLog.dir (in
 conf/spark-defaults.conf) to a directory in hdfs (basically this directory
 should be in a place that is writable by the master and accessible globally
 to all the nodes).



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p13055.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Spark Streaming reset state

2014-08-29 Thread Eko Susilo
Hi all,

I would like to ask some advice about resetting spark stateful operation.
so i tried like this:

JavaStreamingContext jssc = new JavaStreamingContext(context, new
Duration(5000));
jssc.remember(Duration(5*60*1000));
jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
JavaPairReceiverInputDStreamString, String messages =
(JavaPairReceiverInputDStreamString, String)
KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group,
topicMap);
JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH,
SLIDE_INTERVAL);
JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String,
String, LogEntry() { @Override public LogEntry call(Tuple2String,
String tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
_Result; } }).filter(Functions.FILTER_LOG_ENTRY).cache();

JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE).
reduceByKey(Functions.SUM_REDUCER).
updateStateByKey(COMPUTE_RUNNING_SUM);
i thought by setting the remember to 5 minutes, the codes RDD that
derived from messages would also be reseted in 5 minutes, but in fact no.

Is there any way to reset the codes RDD after a period of time (5
minutes)?

Thanks



-- 
Best Regards,
Eko Susilo


Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread bharatvenkat
Chris,

I did the Dstream.repartition mentioned in the document on parallelism in
receiving, as well as set spark.default.parallelism and it still uses only
2 nodes in my cluster.  I notice there is another email thread on the same
topic:

http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

My code is in Java and here is what I have:

   JavaPairReceiverInputDStreamString, String messages =

KafkaUtils.createStream(ssc, zkQuorum,
cse-job-play-consumer, kafkaTopicMap);

JavaPairDStreamString, String newMessages =
messages.repartition(partitionSize);// partitionSize=30

JavaDStreamString lines = newMessages.map(new
FunctionTuple2lt;String, String, String() {
...

public String call(Tuple2String, String tuple2) {
  return tuple2._2();
}
  });

JavaDStreamString words = lines.flatMap(new
MetricsComputeFunction()
);

JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
   ...
}
);

 wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
Void() {...});

Thanks,
Bharat



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



/tmp/spark-events permissions problem

2014-08-29 Thread Brad Miller
Hi All,

Yesterday I restarted my cluster, which had the effect of clearing /tmp.
 When I brought Spark back up and ran my first job, /tmp/spark-events was
re-created and the job ran fine.  I later learned that other users were
receiving errors when trying to create a spark context.  It turned out the
reason was that only my user was able to create subdirectories within
/tmp/spark-events.

I believe /tmp/spark-events originally had ownership bmiller1:bmiller1
(where bmiller1 is my username) with permissions 770.  Once I modified
the permission to allow other users to create subdirectories other users
were again able to launch jobs.

Note that I think this may be related to some problems I am having viewing
application history (see link).
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-td3490.html#a13130

Has anybody else experienced a problem with permissions on the
spark.eventLog.dir directory?

best,
-Brad


Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
What version are you using?


On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com
wrote:

 Still not working for me. I got a compilation error : *value in is not a
 member of Symbol.* Any ideas ?


 On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com
 wrote:

 To pass a list to a variadic function you can use the type ascription :_*

 For example:

 val longList = Seq[Expression](a, b, ...)
 table(src).where('key in (longList: _*))

 Also, note that I had to explicitly specify Expression as the type
 parameter of Seq to ensure that the compiler converts a and b into
 Spark SQL expressions.




 On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 ok, but what if I have a long list do I need to hard code like this
 every element of my list of is there a function that translate a list into
 a tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust 
 mich...@databricks.com wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I need
 to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary








Re: Spark Streaming reset state

2014-08-29 Thread Sean Owen
codes is a DStream, not an RDD. The remember() method controls how
long Spark Streaming holds on to the RDDs itself. Clarify what you
mean by reset? codes provides a stream of RDDs that contain your
computation over a window of time. New RDDs come with the computation
over new data.

On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
eko.harmawan.sus...@gmail.com wrote:
 Hi all,

 I would like to ask some advice about resetting spark stateful operation.
 so i tried like this:

 JavaStreamingContext jssc = new JavaStreamingContext(context, new
 Duration(5000));
 jssc.remember(Duration(5*60*1000));
 jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
 JavaPairReceiverInputDStreamString, String messages =
(JavaPairReceiverInputDStreamString, String)
 KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group,
 topicMap);
 JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH,
 SLIDE_INTERVAL);
 JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String,
 String, LogEntry() { @Override public LogEntry call(Tuple2String, String
 tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result; }
 }).filter(Functions.FILTER_LOG_ENTRY).cache();

 JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE).
 reduceByKey(Functions.SUM_REDUCER).
 updateStateByKey(COMPUTE_RUNNING_SUM);
 i thought by setting the remember to 5 minutes, the codes RDD that derived
 from messages would also be reseted in 5 minutes, but in fact no.

 Is there any way to reset the codes RDD after a period of time (5
 minutes)?

 Thanks



 --
 Best Regards,
 Eko Susilo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Jonathan Hodges
'this 2-node replication is mainly for failover in case the receiver dies
while data is in flight.  there's still chance for data loss as there's no
write ahead log on the hot path, but this is being addressed.'

Can you comment a little on how this will be addressed, will there be a
durable WAL?  Is there a JIRA for tracking this effort?

I am curious without WAL if you can avoid this data loss with explicit
management of Kafka offsets e.g. don't commit offset unless data is
replicated to multiple nodes or maybe not until processed.  The incoming
data will always be durably stored to disk in Kafka so can be replayed in
failure scenarios to avoid data loss if the offsets are managed properly.




On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming scales
 - as well as how it handles failover and checkpointing, but we can discuss
 that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and such
 (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if you
 wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we should be
 able
 to access it once more and not having to pool it back again from Kafka or
 any other stream that is being affected by this issue. If for example
 there
 is a big amount of batches to be recomputed I would rather have them done
 distributed than overloading the batch interval with huge amount of Kafka
 messages.

 I do not have yet enough know how on where is the issue and about the
 internal Spark code so I can't really how much difficult will be the
 implementation.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12966.html
 

Re: Spark Streaming reset state

2014-08-29 Thread Eko Susilo
so the codes currently holding RDD containing codes and its respective
counter. I would like to find a way to reset those RDD after some period of
time.


On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen so...@cloudera.com wrote:

 codes is a DStream, not an RDD. The remember() method controls how
 long Spark Streaming holds on to the RDDs itself. Clarify what you
 mean by reset? codes provides a stream of RDDs that contain your
 computation over a window of time. New RDDs come with the computation
 over new data.

 On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
 eko.harmawan.sus...@gmail.com wrote:
  Hi all,
 
  I would like to ask some advice about resetting spark stateful operation.
  so i tried like this:
 
  JavaStreamingContext jssc = new JavaStreamingContext(context, new
  Duration(5000));
  jssc.remember(Duration(5*60*1000));
  jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
  JavaPairReceiverInputDStreamString, String messages =
 (JavaPairReceiverInputDStreamString, String)
  KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group,
  topicMap);
  JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH,
  SLIDE_INTERVAL);
  JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String,
  String, LogEntry() { @Override public LogEntry call(Tuple2String,
 String
  tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return _Result;
 }
  }).filter(Functions.FILTER_LOG_ENTRY).cache();
 
  JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE).
  reduceByKey(Functions.SUM_REDUCER).
  updateStateByKey(COMPUTE_RUNNING_SUM);
  i thought by setting the remember to 5 minutes, the codes RDD that
 derived
  from messages would also be reseted in 5 minutes, but in fact no.
 
  Is there any way to reset the codes RDD after a period of time (5
  minutes)?
 
  Thanks
 
 
 
  --
  Best Regards,
  Eko Susilo




-- 
Best Regards,
Eko Susilo


RE: Q on downloading spark for standalone cluster

2014-08-29 Thread Sagar, Sanjeev
Hello Sparkies !

Could anyone please answer this? This is not an Hadoop cluster, so which 
download option should I use to download for standalone cluster ?

Also what are the best practices if you’ve 1TB of data and want to use spark ? 
Do you’ve to use Hadoop/CDH or some other option ?

Appreciate it.

From: Sagar, Sanjeev [mailto:sanjeev.sa...@mypointscorp.com]
Sent: Thursday, August 28, 2014 2:44 PM
To: Daniel Siegmann
Cc: user@spark.apache.org
Subject: RE: Q on downloading spark for standalone cluster

Hello Daniel, If you’re not using Hadoop then why you want to grab the Hadoop 
package? CDH5 will download all the Hadoop packages and cloudera manager too.

Just curious what happen if you start spark on EC2 cluster, what it choose for 
the data store as default?

-Sanjeev

From: Daniel Siegmann [mailto:daniel.siegm...@velos.io]
Sent: Thursday, August 28, 2014 2:04 PM
To: Sagar, Sanjeev
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Q on downloading spark for standalone cluster

If you aren't using Hadoop, I don't think it matters which you download. I'd 
probably just grab the Hadoop 2 package.
Out of curiosity, what are you using as your data store? I get the impression 
most Spark users are using HDFS or something built on top.

On Thu, Aug 28, 2014 at 4:07 PM, Sanjeev Sagar 
sanjeev.sa...@mypointscorp.commailto:sanjeev.sa...@mypointscorp.com wrote:
Hello there,

I've a basic question on the downloadthat which option I need to downloadfor 
standalone cluster.

I've a private cluster of three machineson Centos. When I click on download it 
shows me following:


   Download Spark

The latest release is Spark 1.0.2, released August 5, 2014 (release notes) 
http://spark.apache.org/releases/spark-release-1-0-2.html (git tag) 
https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=8fb6f00e195fb258f3f70f04756e07c259a2351f

Pre-built packages:

 * For Hadoop 1 (HDP1, CDH3): find an Apache mirror
   
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop1.tgz
   or direct file download
   http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop1.tgz
 * For CDH4: find an Apache mirror
   
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-cdh4.tgz
   or direct file download
   http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-cdh4.tgz
 * For Hadoop 2 (HDP2, CDH5): find an Apache mirror
   
http://www.apache.org/dyn/closer.cgi/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz
   or direct file download
   http://d3kbcqa49mib13.cloudfront.net/spark-1.0.2-bin-hadoop2.tgz

Pre-built packages, third-party (NOTE: may include non ASF-compatible licenses):

 * For MapRv3: direct file download (external)
   http://package.mapr.com/tools/apache-spark/1.0.2/spark-1.0.2-bin-mapr3.tgz
 * For MapRv4: direct file download (external)
   http://package.mapr.com/tools/apache-spark/1.0.2/spark-1.0.2-bin-mapr4.tgz


From the above it looks like that I've to donwload Hadoop or CDH4 first in 
order to use Spark ? I've a standalone cluster and my data size is also like 
hundreds of Gig or close to Terabyte.

I don't get it that which one I need to download from the above list.

Could some one assist me that which one I need to download for standalone 
cluster and for big data foot print ?

or Hadoop is needed or mandatory for using Spark? that's not the understanding 
I've. My understanding is that you can use spark with Hadoop if you like from 
yarn2 but you could use spark standalone also without hadoop.

Please assist. I'm confused !

-Sanjeev


-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



--
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.iomailto:daniel.siegm...@velos.io W: 
www.velos.iohttp://www.velos.io


Announce: Smoke - a web frontend to Spark

2014-08-29 Thread Horacio G. de Oro
Hi everyone! I've been working on Smoke, a web frontend to
interactively launch Spark jobs without compiling it (only support
Scala right now, and launching the jobs on yarn-client mode). It works
executing the Scala script using spark-shell in the Spark server.

It's developed in Python, uses Celery/Redis to stream the job logs to
the web and is easy to install.

The project is on GitHub: https://github.com/data-tsunami/smoke
(install instructions, screenshots, etc.). It's in an early
development stage, but very usable.

Thanks!

Horacio


--

Horacio G. de Oro
Data Tsunami

Email: hgde...@gmail.com
  Web: http://www.data-tsunami.com/english/
  Cel: +54 9 3572 525359
 LinkedIn: https://www.linkedin.com/in/hgdeoro

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Hive max key length is 767 bytes

2014-08-29 Thread Michael Armbrust
Spark SQL is based on Hive 12.  They must have changed the maximum key size
between 12 and 13.


On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

 Hi,


 Tried the same thing in HIVE directly without issue:

 HIVE:
 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds

 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds



 Then tried again in SPARK:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 14/08/29 19:33:52 INFO Configuration.deprecation:
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext =
 org.apache.spark.sql.hive.HiveContext@395c7b94

 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 res0: org.apache.spark.sql.SchemaRDD =
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive

 scala hiveContext.hql(drop table test_datatype3)

 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown
 while adding/validating class(es) : Specified key was too long; max key
 length is 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
 was too long; max key length is 767 bytes
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted
 in no possible candidates
 Error(s) were found while auto-creating/validating the datastore for
 classes. The errors are printed in the log, and are attached to this
 exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found
 while auto-creating/validating the datastore for classes. The errors are
 printed in the log, and are attached to this exception.
  at
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


 Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:
 Specified key was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown
 while adding/validating class(es) : Specified key was too long; max key
 length is 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key
 was too long; max key length is 767 bytes
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)


 Can anyone please help?

 Regards
 Arthur


 On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:

 (Please ignore if duplicated)


 Hi,

 I use Spark 1.0.2 with Hive 0.13.1

 I have already set the hive mysql database to latine1;

 mysql:
 alter database hive character set latin1;

 Spark:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 scala hiveContext.hql(create table test_datatype1 (testbigint bigint ))
 scala hiveContext.hql(drop table test_datatype1)


 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
 embedded-only so does not have its own datastore table.
 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class
 

Re: Change delimiter when collecting SchemaRDD

2014-08-29 Thread yadid ayzenberg
Thanks Michael, that makes total sense.
It works perfectly.

Yadid


On Thu, Aug 28, 2014 at 9:19 PM, Michael Armbrust mich...@databricks.com
wrote:

 The comma is just the way the default toString works for Row objects.
  Since SchemaRDDs are also RDDs, you can do arbitrary transformations on
 the Row objects that are returned.

 For example, if you'd rather the delimiter was '|':

 sql(SELECT * FROM src).map(_.mkString(|)).collect()


 On Thu, Aug 28, 2014 at 7:58 AM, yadid ayzenberg ya...@media.mit.edu
 wrote:

 Hi All,

 Is there any way to change the delimiter from being a comma ?
 Some of the strings in my data contain commas as well, making it very
 difficult to parse the results.

 Yadid





Problem Accessing Hive Table from hiveContext

2014-08-29 Thread Zitser, Igor
Hi All, 
New to spark and using Spark 1.0.2 and hive 0.12. 

If hive table created as test_datatypes(testbigint bigint, ss bigint )  select 
* from test_datatypes from spark works fine.

For create table test_datatypes(testbigint bigint, testdec decimal(5,2) )

scala val dataTypes=hiveContext.hql(select * from test_datatypes)
14/08/28 21:18:44 INFO parse.ParseDriver: Parsing command: select * from 
test_datatypes
14/08/28 21:18:44 INFO parse.ParseDriver: Parse Completed
14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch 
MultiInstanceRelations
14/08/28 21:18:44 INFO analysis.Analyzer: Max iterations (2) reached for batch 
CaseInsensitiveAttributeReferences
java.lang.IllegalArgumentException: Error: ',', ':', or ';' expected at 
position 14 from 'bigint:decimal(5,2)' [0:bigint, 6::, 7:decimal, 14:(, 15:5, 
16:,, 17:2, 18:)]
    at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils$TypeInfoParser.parseTypeInfos(TypeInfoUtils.java:312)
    at 
org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils.getTypeInfosFromTypeString(TypeInfoUtils.java:716)
    at 
org.apache.hadoop.hive.serde2.lazy.LazyUtils.extractColumnInfo(LazyUtils.java:364)
    at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initSerdeParams(LazySimpleSerDe.java:288)
    at 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.initialize(LazySimpleSerDe.java:187)
    at 
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:218)
    at 
org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:272)
    at 
org.apache.hadoop.hive.ql.metadata.Table.checkValidity(Table.java:175)
    at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:991)
    at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:924)
    at 
org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:58)
    at 
org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:143)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:122)
    at scala.Option.getOrElse(Option.scala:120)
    at 
org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:122)
    at 
org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:149)
    at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:83)
    at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$2.applyOrElse(Analyzer.scala:81)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
    at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)


Same exception happens using table as create table test_datatypes(testbigint 
bigint, testdate date ) . 

Thanks, Igor.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Too many open files

2014-08-29 Thread SK
Hi,

I am having the same problem reported by Michael. I am trying to open 30
files. ulimit -n  shows the limit is 1024. So I am not sure why the program
is failing with  Too many open files error. The total size of all the 30
files is 230 GB. 
I am running the job on a cluster with 10 nodes, each having 16 GB. The
error appears to be happening at the distinct() stage.

Here is my program. In the following code, are all the 10 nodes trying to
open all of the 30 files or are the files distributed among the 30 nodes?  

val baseFile = /mapr/mapr_dir/files_2013apr*
valx = sc.textFile(baseFile)).map { line =
val
fields = line.split(\t)

(fields(11), fields(6)) 
  
}.distinct().countByKey()
val xrdd = sc.parallelize(x.toSeq)
xrdd.saveAsTextFile(...) 

Instead of using the glob *, I guess I can try using a for loop to read the
files one by one if that helps, but not sure if there is a more efficient
solution. 

The following is the error transcript: 

Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
failure: Exception failure in TID 902 on host 192.168.13.11:
java.io.FileNotFoundException:
/tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
files) 
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
org.apache.spark.scheduler.Task.run(Task.scala:51)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744) Driver stacktrace:





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
I'm thinking of local mode where multiple virtual executors occupy the same
vm. Can we have the same configuration in spark standalone cluster mode?


Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Matei Zaharia
Yes, executors run one task per core of your machine by default. You can also 
manually launch them with more worker threads than you have cores. What cluster 
manager are you on?

Matei

On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com) wrote:

I'm thinking of local mode where multiple virtual executors occupy the same vm. 
Can we have the same configuration in spark standalone cluster mode?

Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
I wrote a long post about how I arrived here but in a nutshell I don't see
evidence of re-partitioning and workload distribution across the cluster.
My new fangled way of starting the job is:

run=`date +%m-%d-%YT%T`; \
nohup spark-submit --class logStreamNormalizer \
--master yarn log-stream-normalizer_2.10-1.0.jar \
--jars
spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
\
--driver-memory 8G \
--executor-memory 30G \
--executor-cores 16 \
--num-executors 8 \
--spark.serializer org.apache.spark.serializer.KryoSerializer \
--spark.rdd.compress true \
--spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
--spark.akka.threads 16 \
--spark.task.maxFailures 64 \
--spark.scheduler.mode FAIR \
logs/normRunLog-$run.log \
2logs/normRunLogError-$run.log  \
echo $!  logs/run-$run.pid

Since the job spits out lots of logs, here is how I am trying to determine
if any tasks got assigned to non-local executors.
$ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
| grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

Yields no lines.

If I look at resource pool usage in YARN, this app is assigned 252.5GB of
memory, 128 VCores and 9 containers. Am I missing something here?

Thanks,

Tim







On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote:

 I set partitions to 64:

 //
  kInMsg.repartition(64)
  val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
 //

 Still see all activity only on the two nodes that seem to be receiving
 from Kafka.

 On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
  TD - Apologies, didn't realize I was replying to you instead of the list.
 
  What does numPartitions refer to when calling createStream? I read an
  earlier thread that seemed to suggest that numPartitions translates to
  partitions created on the Spark side?
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
 
  Actually, I re-tried with 64 numPartitions in createStream and that
 didn't
  work. I will manually set repartition to 64/128 and see how that goes.
 
  Thanks.
 
 
 
 
  On Thu, Aug 28, 2014 at 5:42 PM, Tathagata Das 
 tathagata.das1...@gmail.com
  wrote:
 
  Having 16 partitions in KafkaUtils.createStream does not translate to
 the
  RDDs in Spark / Spark Streaming having 16 partitions. Repartition is the
  best way to distribute the received data between all the nodes, as long
 as
  there are sufficient number of partitions (try setting it to 2x the
 number
  cores given to the application).
 
  Yeah, in 1.0.0, ttl should be unnecessary.
 
 
 
  On Thu, Aug 28, 2014 at 5:17 PM, Tim Smith secs...@gmail.com wrote:
 
  On Thu, Aug 28, 2014 at 4:19 PM, Tathagata Das
  tathagata.das1...@gmail.com wrote:
 
  If you are repartitioning to 8 partitions, and your node happen to
 have
  at least 4 cores each, its possible that all 8 partitions are
 assigned to
  only 2 nodes. Try increasing the number of partitions. Also make sure
 you
  have executors (allocated by YARN) running on more than two nodes if
 you
  want to use all 11 nodes in your yarn cluster.
 
 
  If you look at the code, I commented out the manual re-partitioning to
 8.
  Instead, I am created 16 partitions when I call createStream. But I
 will
  increase the partitions to, say, 64 and see if I get better
 parallelism.
 
 
 
  If you are using Spark 1.x, then you dont need to set the ttl for
  running Spark Streaming. In case you are using older version, why do
 you
  want to reduce it? You could reduce it, but it does increase the risk
 of the
  premature cleaning, if once in a while things get delayed by 20
 seconds. I
  dont see much harm in keeping the ttl at 60 seconds (a bit of extra
 garbage
  shouldnt hurt performance).
 
 
  I am running 1.0.0 (CDH5) so ttl setting is redundant? But you are
 right,
  unless I have memory issues, more aggressive pruning won't help.
 
  Thanks,
 
  Tim
 
 
 
 
  TD
 
 
  On Thu, Aug 28, 2014 at 3:16 PM, Tim Smith secs...@gmail.com wrote:
 
  Hi,
 
  In my streaming app, I receive from kafka where I have tried setting
  the partitions when calling createStream or later, by calling
 repartition
  - in both cases, the number of nodes running the tasks seems to be
  stubbornly stuck at 2. Since I have 11 nodes in my cluster, I was
 hoping to
  use more nodes.
 
  I am starting the job as:
  nohup spark-submit --class logStreamNormalizer --master yarn
  log-stream-normalizer_2.10-1.0.jar --jars
 
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
  --executor-memory 30G --spark.cleaner.ttl 60 --executor-cores 8
  --num-executors 8 normRunLog-6.log 2normRunLogError-6.log  echo
 $! 
  run-6.pid
 
  My main code is:
   val sparkConf = new SparkConf().setAppName(SparkKafkaTest)
   val ssc = new 

Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Standalone. I'd love to tell it that my one executor can simultaneously
serve, say, 16 tasks at once for an arbitrary number of distinct jobs.


On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 Yes, executors run one task per core of your machine by default. You can
 also manually launch them with more worker threads than you have cores.
 What cluster manager are you on?

 Matei

 On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I'm thinking of local mode where multiple virtual executors occupy the
 same vm. Can we have the same configuration in spark standalone cluster
 mode?




Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Jaonary Rabarisoa
1.0.2


On Friday, August 29, 2014, Michael Armbrust mich...@databricks.com wrote:

 What version are you using?



 On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com
 javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote:

 Still not working for me. I got a compilation error : *value in is not a
 member of Symbol.* Any ideas ?


 On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust mich...@databricks.com
 javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote:

 To pass a list to a variadic function you can use the type ascription
 :_*

 For example:

 val longList = Seq[Expression](a, b, ...)
 table(src).where('key in (longList: _*))

 Also, note that I had to explicitly specify Expression as the type
 parameter of Seq to ensure that the compiler converts a and b into
 Spark SQL expressions.




 On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com
 javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote:

 ok, but what if I have a long list do I need to hard code like this
 every element of my list of is there a function that translate a list into
 a tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust 
 mich...@databricks.com
 javascript:_e(%7B%7D,'cvml','mich...@databricks.com'); wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
 javascript:_e(%7B%7D,'cvml','jaon...@gmail.com'); wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I need
 to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary









Re: DStream repartitioning, performance tuning processing

2014-08-29 Thread Tim Smith
Crash again. On the driver, logs say:
14/08/29 19:04:55 INFO BlockManagerMaster: Removed 7 successfully in
removeExecutor
org.apache.spark.SparkException: Job aborted due to stage failure: Task
2.0:0 failed 4 times, most recent failure: TID 6383 on host
node-dn1-2-acme.com failed for unknown reason
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


I go look at OS on node-dn1-2 and container logs for TID6383 but find
nothing.
# grep 6383 stderr
14/08/29 18:52:51 INFO CoarseGrainedExecutorBackend: Got assigned task 6383
14/08/29 18:52:51 INFO Executor: Running task ID 6383

However, last message on the container is timestamped 19:04:51 that tells
me the executor was killed for some reason right before the driver noticed
that executor/task failure.

How come my task failed only after 4 times although my config says failure
threshold is 64?








On Fri, Aug 29, 2014 at 12:00 PM, Tim Smith secs...@gmail.com wrote:

 I wrote a long post about how I arrived here but in a nutshell I don't see
 evidence of re-partitioning and workload distribution across the cluster.
 My new fangled way of starting the job is:

 run=`date +%m-%d-%YT%T`; \
 nohup spark-submit --class logStreamNormalizer \
 --master yarn log-stream-normalizer_2.10-1.0.jar \
 --jars
 spark-streaming-kafka_2.10-1.0.0.jar,kafka_2.10-0.8.1.1.jar,zkclient-0.3.jar,metrics-core-2.2.0.jar,json4s-jackson_2.10-3.2.10.jar
 \
 --driver-memory 8G \
 --executor-memory 30G \
 --executor-cores 16 \
 --num-executors 8 \
 --spark.serializer org.apache.spark.serializer.KryoSerializer \
 --spark.rdd.compress true \
 --spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec \
 --spark.akka.threads 16 \
 --spark.task.maxFailures 64 \
 --spark.scheduler.mode FAIR \
 logs/normRunLog-$run.log \
 2logs/normRunLogError-$run.log  \
 echo $!  logs/run-$run.pid

 Since the job spits out lots of logs, here is how I am trying to determine
 if any tasks got assigned to non-local executors.
 $ grep TID logs/normRunLogError-08-29-2014T18\:28\:32.log  | grep Starting
 | grep -v NODE_LOCAL | grep -v PROCESS_LOCAL

 Yields no lines.

 If I look at resource pool usage in YARN, this app is assigned 252.5GB of
 memory, 128 VCores and 9 containers. Am I missing something here?

 Thanks,

 Tim







 On Thu, Aug 28, 2014 at 11:55 PM, Tim Smith secs...@gmail.com wrote:

 I set partitions to 64:

 //
  kInMsg.repartition(64)
  val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))
 //

 Still see all activity only on the two nodes that seem to be receiving
 from Kafka.

 On Thu, Aug 28, 2014 at 5:47 PM, Tim Smith secs...@gmail.com wrote:
  TD - Apologies, didn't realize I was replying to you instead of the
 list.
 
  What does numPartitions refer to when calling createStream? I read an
  earlier thread that seemed to suggest that numPartitions translates to
  partitions created on the Spark side?
 
 http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201407.mbox/%3ccaph-c_o04j3njqjhng5ho281mqifnf3k_r6coqxpqh5bh6a...@mail.gmail.com%3E
 
  Actually, I re-tried with 64 numPartitions in createStream and that
 didn't
  work. I will manually set repartition to 64/128 and see how that goes.
 
  Thanks.
 
 
 
 
  On Thu, Aug 

Spark Streaming with Kafka, building project with 'sbt assembly' is extremely slow

2014-08-29 Thread Aris
Hi folks,

I am trying to use Kafka with Spark Streaming, and it appears I cannot do
the normal 'sbt package' as I do with other Spark applications, such as
Spark alone or Spark with MLlib. I learned I have to build with the
sbt-assembly plugin.

OK, so here is my build.sbt file for my extremely simple test Kafka/Spark
Streaming project. It Takes almost 30 minutes to build! This is a Centos
Linux machine on SSDs with 4GB of RAM, it's never been slow for me. To
compare, sbt assembly for the entire Spark project itself takes less than
10 minutes.

At the bottom of this file I am trying to play with 'cacheOutput' options,
because I read online that maybe I am calculating SHA-1 for all the *.class
files in this super JAR.

I also copied the mergeStrategy from Spark contributor TD Spark Streaming
tutorial from Spark Summit 2014.

Again, is there some better way to build this JAR file, just using sbt
package? This is process is working, but very slow.

Any help with speeding up this compilation is really appreciated!!

Aris

-

import AssemblyKeys._ // put this at the top of the file

name := streamingKafka

version := 1.0

scalaVersion := 2.10.4

libraryDependencies ++= Seq(
  org.apache.spark %% spark-core % 1.0.1 % provided,
  org.apache.spark %% spark-streaming % 1.0.1 % provided,
  org.apache.spark %% spark-streaming-kafka % 1.0.1
)

assemblySettings

jarName in assembly := streamingkafka-assembly.jar

mergeStrategy in assembly := {
  case m if m.toLowerCase.endsWith(manifest.mf)  =
MergeStrategy.discard
  case m if m.toLowerCase.matches(meta-inf.*\\.sf$)  =
MergeStrategy.discard
  case log4j.properties  =
MergeStrategy.discard
  case m if m.toLowerCase.startsWith(meta-inf/services/) =
MergeStrategy.filterDistinctLines
  case reference.conf=
MergeStrategy.concat
  case _   =
MergeStrategy.first
}

assemblyOption in assembly ~= { _.copy(cacheOutput = false) }


Re: SparkSql is slow over yarn

2014-08-29 Thread Nishkam Ravi
Can you share more details about your job, cluster properties and
configuration parameters?

Thanks,
Nishkam


On Fri, Aug 29, 2014 at 11:33 AM, Chirag Aggarwal 
chirag.aggar...@guavus.com wrote:

  When I run SparkSql over yarn, it runs 2-4 times slower as compared to
 when its run in local mode. Please note that I have a four node yarn setup.
 Has anyone else also witnessed the same.




Re: Anyone know hot to submit spark job to yarn in java code?

2014-08-29 Thread Archit Thakur
Hi,

I am facing the same problem.
Did you find any solution or work around?

Thanks and Regards,
Archit Thakur.


On Thu, Jan 16, 2014 at 6:22 AM, Liu, Raymond raymond@intel.com wrote:

 Hi

 Regarding your question

 1) when I run the above script, which jar is beed submitted to the yarn
 server ?

 What SPARK_JAR env point to and the --jar point to are both submitted to
 the yarn server

 2) It like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
 plays the role of client side and
 spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
 examples which will be running in yarn, am I right?

 The spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar will also go to
 yarn cluster as runtime for app
 jar(spark-examples-assembly-0.8.1-incubating.jar)

 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
 and want follow the same logic to submit spark job. For now I can only find
 the command line way to submit spark job to yarn. I believe there is a easy
 way to integration spark in a web allocation.

 You can use the yarn-client mode, you might want to take a look on docs/
 running-on-yarn.md, and probably you might want to try master branch to
 check our latest update on this part of docs. And in yarn client mode, the
 sparkcontext itself will do similar thing as what the command line is doing
 to submit a yarn job

 Then to use it with java, you might want to try out JavaSparkContext
 instead of SparkContext, I don't personally run it with complicated
 applications. But a small example app did works.


 Best Regards,
 Raymond Liu

 -Original Message-
 From: John Zhao [mailto:jz...@alpinenow.com]
 Sent: Thursday, January 16, 2014 2:25 AM
 To: u...@spark.incubator.apache.org
 Subject: Anyone know hot to submit spark job to yarn in java code?

 Now I am working on a web application and  I want to  submit a spark job
 to hadoop yarn.
 I have already do my own assemble and  can run it in command line by the
 following script:

 export YARN_CONF_DIR=/home/gpadmin/clusterConfDir/yarn
 export
 SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
 ./spark-class org.apache.spark.deploy.yarn.Client  --jar
 ./examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
 --class org.apache.spark.examples.SparkPi --args yarn-standalone
 --num-workers 3 --master-memory 1g --worker-memory 512m --worker-cores 1

 It works fine.
 The I realized that it is hard to submit the job from a web application
 .Looks like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar or
 spark-examples-assembly-0.8.1-incubating.jar is a really big jar. I believe
 it contains everything .
 So my question is :
 1) when I run the above script, which jar is beed submitted to the yarn
 server ?
 2) It loos like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
 plays the role of client side and
 spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
 examples which will be running in yarn, am I right?
 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
 and want follow the same logic to submit spark job. For now I can only find
 the command line way to submit spark job to yarn. I believe there is a easy
 way to integration spark in a web allocation.


 Thanks.
 John.



[PySpark] large # of partitions causes OOM

2014-08-29 Thread Nick Chammas
Here’s a repro for PySpark:

a = sc.parallelize([Nick, John, Bob])
a = a.repartition(24000)
a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)

When I try this on an EC2 cluster with 1.1.0-rc2 and Python 2.7, this is
what I get:

 a = sc.parallelize([Nick, John, Bob]) a = a.repartition(24000) 
 a.keyBy(lambda x: len(x)).reduceByKey(lambda x,y: x + y).take(1)14/08/29 
 21:53:40 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(0, ip-10-138-29-167.ec2.internal, 46252, 0) with no recent 
 heart beats: 175143ms exceeds 45000ms14/08/29 21:53:50 WARN 
 BlockManagerMasterActor: Removing BlockManager BlockManagerId(10, 
 ip-10-138-18-106.ec2.internal, 33711, 0) with no recent heart beats: 
 175359ms exceeds 45000ms14/08/29 21:54:02 WARN BlockManagerMasterActor: 
 Removing BlockManager BlockManagerId(19, ip-10-139-36-207.ec2.internal, 
 52208, 0) with no recent heart beats: 173061ms exceeds 45000ms14/08/29 
 21:54:13 WARN BlockManagerMasterActor: Removing BlockManager 
 BlockManagerId(5, ip-10-73-142-70.ec2.internal, 56162, 0) with no recent 
 heart beats: 176816ms exceeds 45000ms14/08/29 21:54:22 WARN 
 BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, 
 ip-10-236-145-200.ec2.internal, 40959, 0) with no recent heart beats: 
 182241ms exceeds 45000ms14/08/29 21:54:40 WARN BlockManagerMasterActor: 
 Removing BlockManager BlockManagerId(4, ip-10-139-1-195.ec2.internal, 
 49221, 0) with no recent heart beats: 178406ms exceeds 45000ms14/08/29 
 21:54:41 ERROR Utils: Uncaught exception in thread Result resolver thread-3
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
at 
org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:514)
at 
org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:355)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:68)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:47)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:46)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread Result resolver thread-3 14/08/29 21:56:26 ERROR
SendingConnection: Exception while reading SendingConnection to
ConnectionManagerId(ip-10-73-142-223.ec2.internal,54014)
java.nio.channels.ClosedChannelException
at sun.nio.ch.SocketChannelImpl.ensureReadOpen(SocketChannelImpl.java:252)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:295)
at org.apache.spark.network.SendingConnection.read(Connection.scala:390)
at 
org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:199)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Input.readBytes(Input.java:296)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:35)
at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.read(DefaultArraySerializers.java:18)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:162)
at 

Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Good to see I am not the only one who cannot get incoming Dstreams to
repartition. I tried repartition(512) but still no luck - the app
stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
release notes for 1.0.1 and 1.0.2, I don't see anything that says this
was an issue and has been fixed.

How do I debug the repartition() statement to see what's the flow
after the job hits that statement?


On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com wrote:
 Chris,

 I did the Dstream.repartition mentioned in the document on parallelism in
 receiving, as well as set spark.default.parallelism and it still uses only
 2 nodes in my cluster.  I notice there is another email thread on the same
 topic:

 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html

 My code is in Java and here is what I have:

JavaPairReceiverInputDStreamString, String messages =

 KafkaUtils.createStream(ssc, zkQuorum,
 cse-job-play-consumer, kafkaTopicMap);

 JavaPairDStreamString, String newMessages =
 messages.repartition(partitionSize);// partitionSize=30

 JavaDStreamString lines = newMessages.map(new
 FunctionTuple2lt;String, String, String() {
 ...

 public String call(Tuple2String, String tuple2) {
   return tuple2._2();
 }
   });

 JavaDStreamString words = lines.flatMap(new
 MetricsComputeFunction()
 );

 JavaPairDStreamString, Integer wordCounts = words.mapToPair(
 new PairFunctionString, String, Integer() {
...
 }
 );

  wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String, Integer,
 Void() {...});

 Thanks,
 Bharat



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] kafka consumer announce

2014-08-29 Thread Evgeniy Shishkin
TD, 

can you please comment on this code?

I am really interested in including this code in Spark.

But i am bothering about some point about persistence:
1. When we extend Receiver and call store, 
is it blocking call? Does it return only when spark stores rdd as requested 
(i.e. replicated or on disk)?
Or is there some buffer allowing dataloss? In the latter case, can we have some 
callback telling to proceed with storing offsets.

2. I saw you implemented some rate limiting. 
Can you clarify how it works? In the face of network receiver getting data as 
fast as it can, and you liming this data in BM.
what happens with exceeding data? Is it discarded? And if not, what happens?

There is a lot of open questions how to make streaming reliable, and i have 
plenty of questions offlist.
But i do not how to improve the code without spark support. 


 On 21 Aug 2014, at 16:17, Evgeniy Shishkin itparan...@gmail.com wrote:
 
 Hello,
 
 we are glad to announce yet another kafka input stream.
 
 Available at https://github.com/wgnet/spark-kafka-streaming
 
 It is used in production for about 3 months.
 We will be happy to hear your feedback.
 
 Custom Spark Kafka consumer based on Kafka SimpleConsumer API.
 
 Features
 
   • discover kafka metadata from zookeeper (more reliable than from 
 brokers, does not depend on broker list changes)
   • reding from multiple topics
   • reliably handles leader election and topic reassignment
   • saves offsets and stream metadata in hbase (more robust than 
 zookeeper)
   • supports metrics via spark metrics mechanism (jmx, graphite, etc.)
 Todo
 
   • abstract offset storage
   • time controlled offsets commit
   • refactor kafka message to rdd elements transformation (flatmapper 
 method)
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL : how to find element where a field is in a given set

2014-08-29 Thread Michael Armbrust
This feature was not part of that version.  It will be in 1.1.


On Fri, Aug 29, 2014 at 12:33 PM, Jaonary Rabarisoa jaon...@gmail.com
wrote:


 1.0.2


 On Friday, August 29, 2014, Michael Armbrust mich...@databricks.com
 wrote:

 What version are you using?



 On Fri, Aug 29, 2014 at 2:22 AM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 Still not working for me. I got a compilation error : *value in is not
 a member of Symbol.* Any ideas ?


 On Fri, Aug 29, 2014 at 9:46 AM, Michael Armbrust 
 mich...@databricks.com wrote:

 To pass a list to a variadic function you can use the type ascription
 :_*

 For example:

 val longList = Seq[Expression](a, b, ...)
 table(src).where('key in (longList: _*))

 Also, note that I had to explicitly specify Expression as the type
 parameter of Seq to ensure that the compiler converts a and b into
 Spark SQL expressions.




 On Thu, Aug 28, 2014 at 11:52 PM, Jaonary Rabarisoa jaon...@gmail.com
 wrote:

 ok, but what if I have a long list do I need to hard code like this
 every element of my list of is there a function that translate a list into
 a tuple ?


 On Fri, Aug 29, 2014 at 3:24 AM, Michael Armbrust 
 mich...@databricks.com wrote:

 You don't need the Seq, as in is a variadic function.

 personTable.where('name in (foo, bar))



 On Thu, Aug 28, 2014 at 3:09 AM, Jaonary Rabarisoa jaon...@gmail.com
  wrote:

 Hi all,

 What is the expression that I should use with spark sql DSL if I
 need to retreive
 data with a field in a given set.
 For example :

 I have the following schema

 case class Person(name: String, age: Int)

 And I need to do something like :

 personTable.where('name in Seq(foo, bar)) ?


 Cheers.


 Jaonary










What is the better data structure in an RDD

2014-08-29 Thread cjwang
I need some advices regarding how data are stored in an RDD.  I have millions
of records, called Measures.  They are bucketed with keys of String type. 
I wonder if I need to store them as RDD[(String, Measure)] or RDD[(String,
Iterable[Measure])], and why?

Data in each bucket are not related most of the time.  The operations that I
often needs to do are:

- Sort the Measures in each bucket separately
- Aggregate the Measures in each bucket separately
- Combine Measures in two RDDs into one based on their bucket keys






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-is-the-better-data-structure-in-an-RDD-tp13159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
I create my DStream very simply as:
val kInMsg =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 8))
.
.
eventually, before I operate on the DStream, I repartition it:
kInMsg.repartition(512)

Are you saying that ^^ repartition doesn't split by dstream into multiple
smaller streams? Should I manually create multiple Dstreams like this?:
val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

Then I apply some custom logic to it as:
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
normalizeLog takes a String and Map of regex and returns a string

In my case, I think I have traced the issue to the receiver executor being
killed by Yarn:
14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
node-dn1-4-acme.com: remote Akka client disassociated

This be the root cause?
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
https://issues.apache.org/jira/browse/SPARK-2121





On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still uses
 only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Spark Streaming reset state

2014-08-29 Thread Christophe Sebastien
You can use a tuple associating a timestamp to your running sum; and have
COMPUTE_RUNNING_SUM to reset the running sum to zero when the timestamp is
more than 5 minutes old.
You'll still have a leak doing so if your keys keep changing, though.

--Christophe


2014-08-29 9:00 GMT-07:00 Eko Susilo eko.harmawan.sus...@gmail.com:


 so the codes currently holding RDD containing codes and its respective
 counter. I would like to find a way to reset those RDD after some period of
 time.


 On Fri, Aug 29, 2014 at 5:55 PM, Sean Owen so...@cloudera.com wrote:

 codes is a DStream, not an RDD. The remember() method controls how
 long Spark Streaming holds on to the RDDs itself. Clarify what you
 mean by reset? codes provides a stream of RDDs that contain your
 computation over a window of time. New RDDs come with the computation
 over new data.

 On Fri, Aug 29, 2014 at 4:30 PM, Eko Susilo
 eko.harmawan.sus...@gmail.com wrote:
  Hi all,
 
  I would like to ask some advice about resetting spark stateful
 operation.
  so i tried like this:
 
  JavaStreamingContext jssc = new JavaStreamingContext(context, new
  Duration(5000));
  jssc.remember(Duration(5*60*1000));
  jssc.checkpoint(ApplicationConstants.HDFS_STREAM_DIRECTORIES);
  JavaPairReceiverInputDStreamString, String messages =
 (JavaPairReceiverInputDStreamString, String)
  KafkaUtils.createStream(jssc, localhost:2181, test-consumer-group,
  topicMap);
  JavaPairDStreamString,String windowed= messages.window(WINDOW_LENGTH,
  SLIDE_INTERVAL);
  JavaDStreamLogEntry lines = windowed.map(new FunctionTuple2String,
  String, LogEntry() { @Override public LogEntry call(Tuple2String,
 String
  tuple2) { LogEntry _Result=Utils.parseLine(tuple2._2()); return
 _Result; }
  }).filter(Functions.FILTER_LOG_ENTRY).cache();
 
  JavaPairDStreamString,Long codes=lines.mapToPair(Functions.GET_CODE).
  reduceByKey(Functions.SUM_REDUCER).
  updateStateByKey(COMPUTE_RUNNING_SUM);
  i thought by setting the remember to 5 minutes, the codes RDD that
 derived
  from messages would also be reseted in 5 minutes, but in fact no.
 
  Is there any way to reset the codes RDD after a period of time (5
  minutes)?
 
  Thanks
 
 
 
  --
  Best Regards,
  Eko Susilo




 --
 Best Regards,
 Eko Susilo



Re: Possible to make one executor be able to work on multiple tasks simultaneously?

2014-08-29 Thread Victor Tso-Guillen
Any more thoughts on this? I'm not sure how to do this yet.


On Fri, Aug 29, 2014 at 12:10 PM, Victor Tso-Guillen v...@paxata.com
wrote:

 Standalone. I'd love to tell it that my one executor can simultaneously
 serve, say, 16 tasks at once for an arbitrary number of distinct jobs.


 On Fri, Aug 29, 2014 at 11:29 AM, Matei Zaharia matei.zaha...@gmail.com
 wrote:

 Yes, executors run one task per core of your machine by default. You can
 also manually launch them with more worker threads than you have cores.
 What cluster manager are you on?

 Matei

 On August 29, 2014 at 11:24:33 AM, Victor Tso-Guillen (v...@paxata.com)
 wrote:

  I'm thinking of local mode where multiple virtual executors occupy the
 same vm. Can we have the same configuration in spark standalone cluster
 mode?





Re: Low Level Kafka Consumer for Spark

2014-08-29 Thread Tim Smith
Ok, so I did this:
val kInStreams = (1 to 10).map{_ =
KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
- 1)) }
val kInMsg = ssc.union(kInStreams)
val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

This has improved parallelism. Earlier I would only get a Stream 0. Now I
have Streams [0-9]. Of course, since the kafka topic has only three
partitions, only three of those streams are active but I am seeing more
blocks being pulled across the three streams total that what one was doing
earlier. Also, four nodes are actively processing tasks (vs only two
earlier) now which actually has me confused. If Streams are active only
on 3 nodes then how/why did a 4th node get work? If a 4th got work why
aren't more nodes getting work?






On Fri, Aug 29, 2014 at 4:11 PM, Tim Smith secs...@gmail.com wrote:

 I create my DStream very simply as:
 val kInMsg =
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
 - 8))
 .
 .
 eventually, before I operate on the DStream, I repartition it:
 kInMsg.repartition(512)

 Are you saying that ^^ repartition doesn't split by dstream into multiple
 smaller streams? Should I manually create multiple Dstreams like this?:
 val kInputs = (1 to 10).map {_= KafkaUtils.createStream()}

 Then I apply some custom logic to it as:
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) //where
 normalizeLog takes a String and Map of regex and returns a string

 In my case, I think I have traced the issue to the receiver executor being
 killed by Yarn:
 14/08/29 22:46:30 ERROR YarnClientClusterScheduler: Lost executor 1 on
 node-dn1-4-acme.com: remote Akka client disassociated

 This be the root cause?

 http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
 https://issues.apache.org/jira/browse/SPARK-2121





 On Fri, Aug 29, 2014 at 3:28 PM, Sean Owen so...@cloudera.com wrote:

 Are you using multiple Dstreams? repartitioning does not affect how
 many receivers you have. It's on 2 nodes for each receiver. You need
 multiple partitions in the queue, each consumed by a DStream, if you
 mean to parallelize consuming the queue.

 On Fri, Aug 29, 2014 at 11:08 PM, Tim Smith secs...@gmail.com wrote:
  Good to see I am not the only one who cannot get incoming Dstreams to
  repartition. I tried repartition(512) but still no luck - the app
  stubbornly runs only on two nodes. Now this is 1.0.0 but looking at
  release notes for 1.0.1 and 1.0.2, I don't see anything that says this
  was an issue and has been fixed.
 
  How do I debug the repartition() statement to see what's the flow
  after the job hits that statement?
 
 
  On Fri, Aug 29, 2014 at 8:31 AM, bharatvenkat bvenkat.sp...@gmail.com
 wrote:
  Chris,
 
  I did the Dstream.repartition mentioned in the document on parallelism
 in
  receiving, as well as set spark.default.parallelism and it still
 uses only
  2 nodes in my cluster.  I notice there is another email thread on the
 same
  topic:
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/DStream-repartitioning-performance-tuning-processing-td13069.html
 
  My code is in Java and here is what I have:
 
 JavaPairReceiverInputDStreamString, String messages =
 
  KafkaUtils.createStream(ssc, zkQuorum,
  cse-job-play-consumer, kafkaTopicMap);
 
  JavaPairDStreamString, String newMessages =
  messages.repartition(partitionSize);// partitionSize=30
 
  JavaDStreamString lines = newMessages.map(new
  FunctionTuple2lt;String, String, String() {
  ...
 
  public String call(Tuple2String, String tuple2) {
return tuple2._2();
  }
});
 
  JavaDStreamString words = lines.flatMap(new
  MetricsComputeFunction()
  );
 
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
  new PairFunctionString, String, Integer() {
 ...
  }
  );
 
   wordCounts.foreachRDD(new FunctionJavaPairRDDlt;String,
 Integer,
  Void() {...});
 
  Thanks,
  Bharat
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p13131.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





RE: [Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-29 Thread Anton Brazhnyk
Just checked it with 1.0.2
Still same exception.

From: Anton Brazhnyk [mailto:anton.brazh...@genesys.com]
Sent: Wednesday, August 27, 2014 6:46 PM
To: Tathagata Das
Cc: user@spark.apache.org
Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

Sorry for the delay with answer – was on vacation.
As I said I was using modified version of launcher from the example. 
Modification is just about setting spark master URL in the code to not use 
run-example script.
The launcher itself was in the attached zip (attaching it once more) as 
ActorWordCount object.

From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Tuesday, August 05, 2014 11:32 PM
To: Anton Brazhnyk
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

How are you launching/submitting the program? Using spark-submit? Or some other 
script (can you provide that)?

TD

On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk 
anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote:
Went through it once again to leave the only modification in question. Still 
same exception.
I hope sources as zip file (instead of github) still can be tolerated. :)

Here is the stacktrace generated with this sources:
14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at 
time 1407289554800
14/08/05 18:45:54 ERROR Remoting: 
org.apache.spark.examples.streaming.CustomMessage
java.lang.ClassNotFoundException: 
org.apache.spark.examples.streaming.CustomMessage
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
at scala.util.Try$.apply(Try.scala:161)
at akka.serialization.Serialization.deserialize(Serialization.scala:98)
at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
-Original Message-
From: Tathagata Das 
[mailto:tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com]
Sent: Tuesday, August 05, 2014 5:42 PM
To: Anton Brazhnyk
Cc: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

 Can you show us the modified version. The reason could very well be what you 
suggest, but I want to understand what conditions lead to this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk 
anton.brazh...@genesys.commailto:anton.brazh...@genesys.com wrote:
 Greetings,



 I modified ActorWordCount example a little and it uses simple case
 class as the 

Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

2014-08-29 Thread Tathagata Das
Can you try adding the JAR to the class path of the executors directly, by
setting the config spark.executor.extraClassPath in the SparkConf. See
Configuration page -
http://spark.apache.org/docs/latest/configuration.html#runtime-environment

I think what you guessed is correct. The Akka actor system is not aware of
the classes that are dynamically added when the custom jar is added with
setJar.

TD

On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk anton.brazh...@genesys.com
wrote:

  Just checked it with 1.0.2

 Still same exception.



 *From:* Anton Brazhnyk [mailto:anton.brazh...@genesys.com]
 *Sent:* Wednesday, August 27, 2014 6:46 PM
 *To:* Tathagata Das
 *Cc:* user@spark.apache.org
 *Subject:* RE: [Streaming] Akka-based receiver with messages defined in
 uploaded jar



 Sorry for the delay with answer – was on vacation.

 As I said I was using modified version of launcher from the example.
 Modification is just about setting spark master URL in the code to not use
 run-example script.

 The launcher itself was in the attached zip (attaching it once more) as
 ActorWordCount object.



 *From:* Tathagata Das [mailto:tathagata.das1...@gmail.com
 tathagata.das1...@gmail.com]
 *Sent:* Tuesday, August 05, 2014 11:32 PM
 *To:* Anton Brazhnyk
 *Cc:* user@spark.apache.org
 *Subject:* Re: [Streaming] Akka-based receiver with messages defined in
 uploaded jar



 How are you launching/submitting the program? Using spark-submit? Or some
 other script (can you provide that)?



 TD



 On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk anton.brazh...@genesys.com
 wrote:

 Went through it once again to leave the only modification in question.
 Still same exception.
 I hope sources as zip file (instead of github) still can be tolerated. :)

 Here is the stacktrace generated with this sources:
 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called
 at time 1407289554800
 14/08/05 18:45:54 ERROR Remoting:
 org.apache.spark.examples.streaming.CustomMessage
 java.lang.ClassNotFoundException:
 org.apache.spark.examples.streaming.CustomMessage

 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at
 java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
 at
 akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
 akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
 at
 akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
 at
 akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
 at scala.util.Try$.apply(Try.scala:161)
 at
 akka.serialization.Serialization.deserialize(Serialization.scala:98)
 at
 akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
 at
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
 at
 akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
 at
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 -Original Message-
 From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
 Sent: Tuesday, August 05, 2014 

Re: Too many open files

2014-08-29 Thread Ye Xianjin
Ops,the last reply didn't go to the user list.  Mail app's fault.

Shuffling happens in the cluster, so you need change all the nodes in the 
cluster.



Sent from my iPhone

 On 2014年8月30日, at 3:10, Sudha Krishna skrishna...@gmail.com wrote:
 
 Hi,
 
 Thanks for your response. Do you know if I need to change this limit on all 
 the cluster nodes or just the master?
 Thanks
 
 On Aug 29, 2014 11:43 AM, Ye Xianjin advance...@gmail.com wrote:
 1024 for the number of file limit is most likely too small for Linux 
 Machines on production. Try to set to 65536 or unlimited if you can. The too 
 many open files error occurs because there are a lot of shuffle files(if 
 wrong, please correct me):
 
 Sent from my iPhone
 
  On 2014年8月30日, at 2:06, SK skrishna...@gmail.com wrote:
 
  Hi,
 
  I am having the same problem reported by Michael. I am trying to open 30
  files. ulimit -n  shows the limit is 1024. So I am not sure why the program
  is failing with  Too many open files error. The total size of all the 30
  files is 230 GB.
  I am running the job on a cluster with 10 nodes, each having 16 GB. The
  error appears to be happening at the distinct() stage.
 
  Here is my program. In the following code, are all the 10 nodes trying to
  open all of the 30 files or are the files distributed among the 30 nodes?
 
 val baseFile = /mapr/mapr_dir/files_2013apr*
 valx = sc.textFile(baseFile)).map { line =
 val
  fields = line.split(\t)
 
  (fields(11), fields(6))
 
  }.distinct().countByKey()
 val xrdd = sc.parallelize(x.toSeq)
 xrdd.saveAsTextFile(...)
 
  Instead of using the glob *, I guess I can try using a for loop to read the
  files one by one if that helps, but not sure if there is a more efficient
  solution.
 
  The following is the error transcript:
 
  Job aborted due to stage failure: Task 1.0:201 failed 4 times, most recent
  failure: Exception failure in TID 902 on host 192.168.13.11:
  java.io.FileNotFoundException:
  /tmp/spark-local-20140829131200-0bb7/08/shuffle_0_201_999 (Too many open
  files)
  java.io.FileOutputStream.open(Native Method)
  java.io.FileOutputStream.init(FileOutputStream.java:221)
  org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:116)
  org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:177)
  org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
  org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158)
  scala.collection.Iterator$class.foreach(Iterator.scala:727)
  org.apache.spark.util.collection.AppendOnlyMap$$anon$1.foreach(AppendOnlyMap.scala:159)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  org.apache.spark.scheduler.Task.run(Task.scala:51)
  org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  java.lang.Thread.run(Thread.java:744) Driver stacktrace:
 
 
 
 
 
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-tp1464p13144.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org