Re: Launching Spark Cluster Application through IDE

2015-03-20 Thread Akhil Das
From IntelliJ, you can use the remote debugging feature.
http://stackoverflow.com/questions/19128264/how-to-remote-debug-in-intellij-12-1-4

For remote debugging, you need to pass the following:

-Xdebug -Xrunjdwp:server=y,transport=dt_socket,address=4000,suspend=n

 jvm options and configure your ide on that given port (4000) for remote
debugging.

Thanks
Best Regards

On Fri, Mar 20, 2015 at 9:46 AM, raggy raghav0110...@gmail.com wrote:

 I am trying to debug a Spark Application on a cluster using a master and
 several worker nodes. I have been successful at setting up the master node
 and worker nodes using Spark standalone cluster manager. I downloaded the
 spark folder with binaries and use the following commands to setup worker
 and master nodes. These commands are executed from the spark directory.

 command for launching master

 ./sbin/start-master.sh
 command for launching worker node

 ./bin/spark-class org.apache.spark.deploy.worker.Worker master-URL
 command for submitting application

 ./sbin/spark-submit --class Application --master URL ~/app.jar
 Now, I would like to understand the flow of control through the Spark
 source
 code on the worker nodes when I submit my application(I just want to use
 one
 of the given examples that use reduce()). I am assuming I should setup
 Spark
 on Eclipse. The Eclipse setup link on the Apache Spark website seems to be
 broken. I would appreciate some guidance on setting up Spark and Eclipse to
 enable stepping through Spark source code on the worker nodes.

 If not Eclipse, I would be open to using some other IDE or approach that
 will enable me to step through Spark source code after launching my
 application.

 Thanks!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Launching-Spark-Cluster-Application-through-IDE-tp22155.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Visualizing Spark Streaming data

2015-03-20 Thread Harut
I'm trying to build a dashboard to visualize stream of events coming from
mobile devices. 
For example, I have event called add_photo, from which I want to calculate
trending tags for added photos for last x minutes. Then I'd like to
aggregate that by country, etc. I've built the streaming part, which reads
from Kafka, and calculates needed results and get appropriate RDDs, the
question is now how to connect it to UI. 

Is there any general practices on how to pass parameters to spark from some
custom built UI, how to organize data retrieval, what intermediate storages
to use, etc.

Thanks in advance.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Powered by Spark addition

2015-03-20 Thread Ricardo Almeida
Hello Matei,

Could you please also add our company to the Powered By list?
Details are as follows:

Name: Act Now
URL: www.actnowib.com

Description:
Sparks powers NOW APPS, a big data, real-time, predictive analytics
platform.
Using Spark SQL, MLlib and GraphX components for both batch ETL and
analytics applied to telecommunication data, providing faster and more
meaningful insights.and actionable data to the operators.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Powered-by-Spark-addition-tp7422p22161.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrade from Spark 1.1.0 to 1.1.1+ Issues

2015-03-20 Thread Akhil Das
Are you submitting your application from local to a remote host?
If you want to run the spark application from a remote machine, then you have
to at least set the following configurations properly.

 - *spark.driver.host* - points to the ip/host from where you are submitting
 the job (make sure you are able to ping this from the cluster)

 - *spark.driver.port* - set it to a port number which is accessible from
 the spark cluster.

 You can look at more configuration options over here.
http://spark.apache.org/docs/latest/configuration.html#networking


Thanks
Best Regards

On Fri, Mar 20, 2015 at 4:02 AM, Eason Hu eas...@gmail.com wrote:

 Hi Akhil,

 Thank you for your help.  I just found that the problem is related to my
 local spark application, since I ran it in IntelliJ and I didn't reload the
 project after I recompile the jar via maven.  If I didn't reload, it will
 use some local cache data to run the application which leads to two
 different versions.  After I reloaded the project and reran, it was running
 fine for v1.1.1 and I no longer saw that class incompatible issues.

 However, I now encounter a new issue starting from v1.2.0 and above.

 Using Spark's default log4j profile: 
 org/apache/spark/log4j-defaults.properties
 15/03/19 01:10:17 INFO CoarseGrainedExecutorBackend: Registered signal 
 handlers for [TERM, HUP, INT]
 15/03/19 01:10:17 WARN NativeCodeLoader: Unable to load native-hadoop library 
 for your platform... using builtin-java classes where applicable
 15/03/19 01:10:17 INFO SecurityManager: Changing view acls to: hduser,eason.hu
 15/03/19 01:10:17 INFO SecurityManager: Changing modify acls to: 
 hduser,eason.hu
 15/03/19 01:10:17 INFO SecurityManager: SecurityManager: authentication 
 disabled; ui acls disabled; users with view permissions: Set(hduser, 
 eason.hu); users with modify permissions: Set(hduser, eason.hu)
 15/03/19 01:10:18 INFO Slf4jLogger: Slf4jLogger started
 15/03/19 01:10:18 INFO Remoting: Starting remoting
 15/03/19 01:10:18 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://driverPropsFetcher@hduser-07:59122]
 15/03/19 01:10:18 INFO Utils: Successfully started service 
 'driverPropsFetcher' on port 59122.
 15/03/19 01:10:21 WARN ReliableDeliverySupervisor: Association with remote 
 system [akka.tcp://sparkDriver@192.168.1.53:65001] has failed, address is now 
 gated for [5000] ms. Reason is: [Association failed with 
 [akka.tcp://sparkDriver@192.168.1.53:65001]].
 15/03/19 01:10:48 ERROR UserGroupInformation: PriviledgedActionException 
 as:eason.hu (auth:SIMPLE) cause:java.util.concurrent.TimeoutException: 
 Futures timed out after [30 seconds]
 Exception in thread main java.lang.reflect.UndeclaredThrowableException: 
 Unknown exception in doAs
   at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1421)
   at 
 org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:59)
   at 
 org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:128)
   at 
 org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:224)
   at 
 org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
 Caused by: java.security.PrivilegedActionException: 
 java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
   at java.security.AccessController.doPrivileged(Native Method)
   at javax.security.auth.Subject.doAs(Subject.java:415)
   at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
   ... 4 more
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 
 seconds]
   at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
   at 
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
   at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
   at 
 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
   at scala.concurrent.Await$.result(package.scala:107)
   at 
 org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:144)
   at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:60)
   at 
 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:59)
   ... 7 more

 Do you have any clues why it happens only after v1.2.0 and above?  Nothing
 else changes.

 Thanks,
 Eason

 On Tue, Mar 17, 2015 at 8:39 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Its clearly saying:

 java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId;
 local class incompatible: stream classdesc serialVersionUID =
 2439208141545036836, local class serialVersionUID = -7366074099953117729

 Version incompatibility, can you double check your version?
 On 18 Mar 2015 06:08, Eason Hu 

Re: Visualizing Spark Streaming data

2015-03-20 Thread Jeffrey Jedele
I'll stay with my recommendation - that's exactly what Kibana is made for ;)

2015-03-20 9:06 GMT+01:00 Harut Martirosyan harut.martiros...@gmail.com:

 Hey Jeffrey.
 Thanks for reply.

 I already have something similar, I use Grafana and Graphite, and for
 simple metric streaming we've got all set-up right.

 My question is about interactive patterns. For instance, dynamically
 choose an event to monitor, dynamically choose group-by field or any sort
 of filter, then view results. This is easy when you have 1 user, but if you
 have team of analysts all specifying their own criteria, it becomes hard to
 manage them all.

 On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com
 wrote:

 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a look
 at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
 gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming from
 mobile devices.
 For example, I have event called add_photo, from which I want to
 calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which
 reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 RGRDZ Harut



Re: Visualizing Spark Streaming data

2015-03-20 Thread Harut Martirosyan
Hey Jeffrey.
Thanks for reply.

I already have something similar, I use Grafana and Graphite, and for
simple metric streaming we've got all set-up right.

My question is about interactive patterns. For instance, dynamically choose
an event to monitor, dynamically choose group-by field or any sort of
filter, then view results. This is easy when you have 1 user, but if you
have team of analysts all specifying their own criteria, it becomes hard to
manage them all.

On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote:

 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a look
 at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
 gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming from
 mobile devices.
 For example, I have event called add_photo, from which I want to calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
RGRDZ Harut


Measuer Bytes READ and Peak Memory Usage for Query

2015-03-20 Thread anu
Hi All

I would like to measure Bytes Read and Peak Memory Usage for a Spark SQL
Query. 

Please clarify if Bytes Read = aggregate size of all RDDs ??
All my RDDs are in memory and 0B spill to disk.

And I am clueless how to measure Peak Memory Usage.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Measuer-Bytes-READ-and-Peak-Memory-Usage-for-Query-tp22159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Measuer Bytes READ and Peak Memory Usage for Query

2015-03-20 Thread Akhil Das
You could do a cache and see the memory usage under Storage tab in the
driver UI (runs on port 4040)

Thanks
Best Regards

On Fri, Mar 20, 2015 at 12:02 PM, anu anamika.guo...@gmail.com wrote:

 Hi All

 I would like to measure Bytes Read and Peak Memory Usage for a Spark SQL
 Query.

 Please clarify if Bytes Read = aggregate size of all RDDs ??
 All my RDDs are in memory and 0B spill to disk.

 And I am clueless how to measure Peak Memory Usage.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Measuer-Bytes-READ-and-Peak-Memory-Usage-for-Query-tp22159.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Load balancing

2015-03-20 Thread Akhil Das
1. If you are consuming data from Kafka or any other receiver based
sources, then you can start 1-2 receivers per worker (assuming you'll have
min 4 core per worker)

2. If you are having single receiver or is a fileStream then what you can
do to distribute the data across machines is to do a repartition.

Thanks
Best Regards

On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 I am trying to understand how to load balance the incoming data to
 multiple spark streaming workers. Could somebody help me understand how I
 can distribute my incoming data from various sources such that incoming
 data is going to multiple spark streaming nodes? Is it done by spark client
 with help of spark master similar to hadoop client asking namenodes for the
 list of datanodes?



Re: Visualizing Spark Streaming data

2015-03-20 Thread Jeffrey Jedele
Hey Harut,
I don't think there'll by any general practices as this part heavily
depends on your environment, skills and what you want to achieve.

If you don't have a general direction yet, I'd suggest you to have a look
at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
gets a lot of traction currently.

Regards,
Jeff

2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming from
 mobile devices.
 For example, I have event called add_photo, from which I want to calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from some
 custom built UI, how to organize data retrieval, what intermediate storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark-submit and multiple files

2015-03-20 Thread Guillaume Charhon
Hi Davies,

I am already using --py-files. The system does use the other file. The
error I am getting is not trivial. Please check the error log.



On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com wrote:

 You could submit additional Python source via --py-files , for example:

 $ bin/spark-submit --py-files work.py main.py

 On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com
 wrote:
  Hello guys,
 
  I am having a hard time to understand how spark-submit behave with
 multiple
  files. I have created two code snippets. Each code snippet is composed
 of a
  main.py and work.py. The code works if I paste work.py then main.py in a
  pyspark shell. However both snippets do not work when using spark submit
 and
  generate different errors.
 
  Function add_1 definition outside
  http://www.codeshare.io/4ao8B
  https://justpaste.it/jzvj
 
  Embedded add_1 function definition
  http://www.codeshare.io/OQJxq
  https://justpaste.it/jzvn
 
  I am trying a way to make it work.
 
  Thank you for your support.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Spark 1.2. loses often all executors

2015-03-20 Thread mrm
Hi,

I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
loses all executors whenever I have any Python code bug (like looking up a
key in a dictionary that does not exist). In earlier versions, it would
raise an exception but it would not lose all executors. 

Anybody with a similar problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Load balancing

2015-03-20 Thread Jeffrey Jedele
Hi Mohit,
it also depends on what the source for your streaming application is.

If you use Kafka, you can easily partition topics and have multiple
receivers on different machines.

If you have sth like a HTTP, socket, etc stream, you probably can't do
that. The Spark RDDs generated by your receiver will be partitioned and
processed in a distributed manner like usual Spark RDDs however. There are
parameters to control that behavior (e.g. defaultParallelism and
blockInterval).

See here for more details:
http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning

Regards,
Jeff

2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com:

 1. If you are consuming data from Kafka or any other receiver based
 sources, then you can start 1-2 receivers per worker (assuming you'll have
 min 4 core per worker)

 2. If you are having single receiver or is a fileStream then what you can
 do to distribute the data across machines is to do a repartition.

 Thanks
 Best Regards

 On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com
 wrote:

 I am trying to understand how to load balance the incoming data to
 multiple spark streaming workers. Could somebody help me understand how I
 can distribute my incoming data from various sources such that incoming
 data is going to multiple spark streaming nodes? Is it done by spark client
 with help of spark master similar to hadoop client asking namenodes for the
 list of datanodes?





Clean the shuffle data during iteration

2015-03-20 Thread James
Hello,

Is that possible to delete shuffle data of previous iteration as it is not
necessary?

Alcaid


Re: MLlib Spam example gets stuck in Stage X

2015-03-20 Thread Su She
Hello Xiangrui,

I use spark 1.2.0 on cdh 5.3. Thanks!

-Su


On Fri, Mar 20, 2015 at 2:27 PM Xiangrui Meng men...@gmail.com wrote:

 Su, which Spark version did you use? -Xiangrui

 On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:
  To get these metrics out, you need to open the driver ui running on port
  4040. And in there you will see Stages information and for each stage you
  can see how much time it is spending on GC etc. In your case, the
  parallelism seems 4, the more # of parallelism the more # of tasks you
 will
  see.
 
  Thanks
  Best Regards
 
  On Thu, Mar 19, 2015 at 1:15 PM, Su She suhsheka...@gmail.com wrote:
 
  Hi Akhil,
 
  1) How could I see how much time it is spending on stage 1? Or what if,
  like above, it doesn't get past stage 1?
 
  2) How could I check if its a GC time? and where would I increase the
  parallelism for the model? I have a Spark Master and 2 Workers running
 on
  CDH 5.3...what would the default spark-shell level of parallelism be...I
  thought it would be 3?
 
  Thank you for the help!
 
  -Su
 
 
  On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com
 
  wrote:
 
  Can you see where exactly it is spending time? Like you said it goes to
  Stage 2, then you will be able to see how much time it spend on Stage
 1. See
  if its a GC time, then try increasing the level of parallelism or
  repartition it like sc.getDefaultParallelism*3.
 
  Thanks
  Best Regards
 
  On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com
 wrote:
 
  Hello Everyone,
 
  I am trying to run this MLlib example from Learning Spark:
 
  https://github.com/databricks/learning-spark/blob/master/src
 /main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48
 
  Things I'm doing differently:
 
  1) Using spark shell instead of an application
 
  2) instead of their spam.txt and normal.txt I have text files with
 3700
  and 2700 words...nothing huge at all and just plain text
 
  3) I've used numFeatures = 100, 1000 and 10,000
 
  Error: I keep getting stuck when I try to run the model:
 
  val model = new LogisticRegressionWithSGD().run(trainingData)
 
  It will freeze on something like this:
 
  [Stage 1:==
 (1 +
  0) / 4]
 
  Sometimes its Stage 1, 2 or 3.
 
  I am not sure what I am doing wrong...any help is much appreciated,
  thank you!
 
  -Su
 
 
 
 
 



Spark Streaming Not Reading Messages From Multiple Kafka Topics

2015-03-20 Thread EH
Hi all,

I'm building a Spark Streaming application that will continuously read
multiple kafka topics at the same time.  However, I found a weird issue that
it reads only hundreds of messages then it stopped reading any more.  If I
changed the three topic to only one topic, then it is fine and it will
continue to consume.  Below is the code I have.

val consumerThreadsPerInputDstream = 1
val topics = Map(raw_0 - consumerThreadsPerInputDstream)
 raw_1 - consumerThreadsPerInputDstream,
 raw_2 - consumerThreadsPerInputDstream)

val msgs = KafkaUtils.createStream(ssc, 10.10.10.10:2181/hkafka,
group01, topics).map(_._2)
...

How come it will no longer consume after hundreds of messages for three
topic reading?  How to resolve this issue?

Thank you for your help,
Eason



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WebUI on yarn through ssh tunnel affected by AmIpfilter

2015-03-20 Thread Marcelo Vanzin
Instead of opening a tunnel to the Spark web ui port, could you open a
tunnel to the YARN RM web ui instead? That should allow you to
navigate to the Spark application's web ui through the RM proxy, and
hopefully that will work better.

On Fri, Feb 6, 2015 at 9:08 PM, yangqch davidyang...@gmail.com wrote:
 Hi folks,

 I am new to spark. I just get spark 1.2 to run on emr ami 3.3.1 (hadoop
 2.4).
 I ssh to emr master node and submit the job or start the shell. Everything
 runs well except the webUI.

 In order to see the UI, I used ssh tunnel which forward my dev machine port
 to emr master node webUI port.

 When I open the webUI, at the very beginning of the application (during the
 spark launch time), the webUI is as nice as shown in many spark docs.
 However, once the YARN AmIpfilter started to work, the webUI becomes very
 ugly. No pictures can be displayed, only text can be shown (just like you
 view it in lynx). Meanwhile, in spark shell, it pops up amfilter.AmIpFilter
 (AmIpFilter.java:doFilter(157)) - Could not find proxy-user cookie, so user
 will not be set”.

 Can anyone give me some help? Thank you!





 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread Matei Zaharia
Feel free to send a pull request to fix the doc (or say which versions it's 
needed in).

Matei

 On Mar 20, 2015, at 6:49 PM, Krishna Sankar ksanka...@gmail.com wrote:
 
 Yep the command-option is gone. No big deal, just add the '%pylab inline' 
 command as part of your notebook.
 Cheers
 k/
 
 On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com 
 mailto:yuecong1...@gmail.com wrote:
 Hello :
 
 I tried ipython notebook with the following command in my enviroment.
 
 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 
 But it shows  --pylab inline support is removed from ipython newest version.
 the log is as :
 ---
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
 command line has been removed.
 [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
 `%matplotlib inline` in the notebook itself.
 --
 I am using IPython 3.0.0. and only IPython works in my enviroment.
 --
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 --
 
 Does somebody have the same issue as mine? How do you solve it?
 
 Thanks,
 Cong
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread cong yue
Let me do it now. I appreciate the perfect easy understandable
documentation of spark!

The updated command will be like

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
./bin/pyspark

When IPython notebook server is launched, you can create a new Python
2 notebook from Files tab. Inside the notebook, you can input the
'%pylab inline' command as part of your notebook before you start to
try spark from IPython notebook.


Cheers.
Cong

2015-03-20 16:14 GMT-07:00 Matei Zaharia matei.zaha...@gmail.com:
 Feel free to send a pull request to fix the doc (or say which versions it's
 needed in).

 Matei

 On Mar 20, 2015, at 6:49 PM, Krishna Sankar ksanka...@gmail.com wrote:

 Yep the command-option is gone. No big deal, just add the '%pylab inline'
 command as part of your notebook.
 Cheers
 k/

 On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com wrote:

 Hello :

 I tried ipython notebook with the following command in my enviroment.

 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark

 But it shows  --pylab inline support is removed from ipython newest
 version.
 the log is as :
 ---
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
 command line has been removed.
 [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
 `%matplotlib inline` in the notebook itself.
 --
 I am using IPython 3.0.0. and only IPython works in my enviroment.
 --
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 --

 Does somebody have the same issue as mine? How do you solve it?

 Thanks,
 Cong

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: WebUI on yarn through ssh tunnel affected by AmIpfilter

2015-03-20 Thread benbongalon

I ran into a similar issue. What's happening is that when Spark is running
in YARN client mode, YARN automatically launches a  Web Application Proxy
http://archive.cloudera.com/cdh5/cdh/5/hadoop/hadoop-yarn/hadoop-yarn-site/WebApplicationProxy.html
  
to reduce hacking attempts. In doing so, it adds the AmIpFilter to the
proxy. You can see this is the example log snippet below:

15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster
registered as
Actor[akka.tcp://sparkyar...@ip-172-31-44-228.us-west-2.compute.internal:53028/user/YarnAM#-1897510590]
15/03/20 21:33:14 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter.
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS
- 172.31.36.22, PROXY_URI_BASES -
http://172.31.36.22:9046/proxy/application_1426881405719_0009),
/proxy/application_1426881405719_0009
15/03/20 21:33:14 INFO ui.JettyUtils: Adding filter:
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/03/20 21:33:15 INFO yarn.Client: Application report for
application_1426881405719_0009 (state: RUNNING)
15/03/20 21:33:15 INFO yarn.Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: ip-172-31-44-228.us-west-2.compute.internal
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1426887190001
 final status: UNDEFINED
 *tracking URL:
http://172.31.36.22:9046/proxy/application_1426881405719_0009/*

While I haven't found a way to disable it (the  Spark doc
http://spark.apache.org/docs/1.2.1/security.html   may help), you can view
the Web UI by forwarding the proxy's port (9046 by default). Then point your
browser the the tracking URL with the host IP replaced with localhost, eg:

http://localhost:9046/proxy/application_1426881405719_0009

Hope that helps.
Ben





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/WebUI-on-yarn-through-ssh-tunnel-affected-by-AmIpfilter-tp21540p22169.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Hi,

Running Spark 1.3 with secured Hadoop.

Spark-shell with Yarn client mode runs without issue when not using Dynamic
Allocation.

When Dynamic allocation is turned on, the shell comes up but same SQL etc.
causes it to loop.

spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.initialExecutors=1
spark.dynamicAllocation.maxExecutors=10
# Set IdleTime low for testing
spark.dynamicAllocation.executorIdleTimeout=60
spark.shuffle.service.enabled=true

Following is the start of the messages and then it keeps looping with
Requesting 0 new executors

15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
broadcast_1_piece0
15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
broadcast at DAGScheduler.scala:839
15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
tasks
15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
any resources; check your cluster UI to ensure that workers are registered
and have sufficient resources
15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)
15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
executor(s) because tasks are backlogged (new desired total will be 1)


Re: Spark 1.3 Dynamic Allocation - Requesting 0 new executor(s) because tasks are backlogged

2015-03-20 Thread Manoj Samel
Forgot to add - the cluster is idle otherwise so there should be no
resource issues. Also the configuration works when not using Dynamic
allocation.

On Fri, Mar 20, 2015 at 4:15 PM, Manoj Samel manojsamelt...@gmail.com
wrote:

 Hi,

 Running Spark 1.3 with secured Hadoop.

 Spark-shell with Yarn client mode runs without issue when not using
 Dynamic Allocation.

 When Dynamic allocation is turned on, the shell comes up but same SQL etc.
 causes it to loop.

 spark.dynamicAllocation.enabled=true
 spark.dynamicAllocation.initialExecutors=1
 spark.dynamicAllocation.maxExecutors=10
 # Set IdleTime low for testing
 spark.dynamicAllocation.executorIdleTimeout=60
 spark.shuffle.service.enabled=true

 Following is the start of the messages and then it keeps looping with
 Requesting 0 new executors

 15/03/20 22:52:42 INFO storage.BlockManagerMaster: Updated info of block
 broadcast_1_piece0
 15/03/20 22:52:42 INFO spark.SparkContext: Created broadcast 1 from
 broadcast at DAGScheduler.scala:839
 15/03/20 22:52:42 INFO scheduler.DAGScheduler: Submitting 1 missing tasks
 from Stage 0 (MapPartitionsRDD[3] at mapPartitions at Exchange.scala:100)
 15/03/20 22:52:42 INFO cluster.YarnScheduler: Adding task set 0.0 with 1
 tasks
 15/03/20 22:52:47 INFO spark.ExecutorAllocationManager: Requesting 1 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:52 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:52:57 WARN cluster.YarnScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient resources
 15/03/20 22:52:57 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:02 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:07 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:12 WARN cluster.YarnScheduler: Initial job has not accepted
 any resources; check your cluster UI to ensure that workers are registered
 and have sufficient resources
 15/03/20 22:53:17 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:22 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)
 15/03/20 22:53:27 INFO spark.ExecutorAllocationManager: Requesting 0 new
 executor(s) because tasks are backlogged (new desired total will be 1)



Registring UDF from a different package fails

2015-03-20 Thread Ravindra
Hi All,

I have all my UDFs defined in the classes residing in a different package
than where I am instantiating my HiveContext.

I have a register function in my UDF class. I pass HiveContext to this
function. and in this function I call
hiveContext.registerFunction(myudf, myudf _)

All goes well but at the runtime when I execute query val sqlresult =
hiveContext.sql(query)
It doesn't work. sqlresult comes as null. There is no exception thrown by
spark and there is no proper logs indicating the error.

But all goes well if I bring my UDF class into the same package where I am
instantiating hiveContext.

I dig more into the spark code and found that (may be I am wrong here)

./sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

has a class named as SimpleFunctionRegistry wherein lookupFunction is not
throwing error if it doesn't find a function.


Kindly help


Appreciate

Ravi.


about Partition Index

2015-03-20 Thread Long Cheng
Dear all,

About the index of each partition of an RDD, I am wondering whether we
can keep their numbering on each physical machine in a hash
partitioning process. For example, a cluster containing three physical
machines A,B,C (all are workers), for an RDD with six partitions,
assume that the two partitions with index 0 and 3 are in A, partitions
with index 1 and 4 are in B and the ones with index 2 and 5 are in C.
Then, if I hash partition the RDD using partitionBy(new
HashPartitioner(6)), will the new created RDD still have the same
partition index on each machine? Is it possible that the partitions
with index 0 and 3 are now on B but not A? If it is, is there any
method that we can use to keep both the RDDs having the same numbering
on each physical machine?

Thanks in advance.

Long

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Gourav Sengupta
: 7E6F85873D69D14E, x-amz-id-2:
 rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=,
 x-amz-region: eu-central-1, Content-Type: application/xml,
 Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT,
 Connection: close, Server: AmazonS3]
 15/03/20 11:25:32 WARN RestStorageService: Retrying request after
 automatic adjustment of Host endpoint from 
 frankfurt.ingestion.batch.s3.amazonaws.com to 
 frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com following
 request signing error using AWS request signing version 4: GET
 https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/
 HTTP/1.1
 15/03/20 11:25:32 WARN RestStorageService: Retrying request following
 error response: GET
 '/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/'
 -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
 Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
 frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z,
 Authorization: AWS4-HMAC-SHA256
 Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
 Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
 V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
 Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
 Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
 Exception in thread main org.apache.hadoop.mapred.InvalidInputException:
 Input path does not exist:
 s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz


 Do you have any Ideas? Was somebody of you already able to access S3 in
 Frankfurt, if so - how?

 Cheers Ralf





Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Ralf Heyde
/HotelImageList.gz/delimiter=/'
-- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date: 20150320T112531Z,
Authorization: AWS4-HMAC-SHA256
Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
Exception in thread main org.apache.hadoop.mapred.InvalidInputException:
Input path does not exist:
s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz


Do you have any Ideas? Was somebody of you already able to access S3 in
Frankfurt, if so - how?

Cheers Ralf


How to handle under-performing nodes in the cluster

2015-03-20 Thread Yiannis Gkoufas
Hi all,

I have 6 nodes in the cluster and one of the nodes is clearly
under-performing:


​
I was wandering what is the impact of having such issues? Also what is the
recommended way to workaround it?

Thanks a lot,
Yiannis


What is the jvm size when start spark-submit through local mode

2015-03-20 Thread Shuai Zheng
Hi,

 

I am curious, when I start a spark program in local mode, which parameter
will be used to decide the jvm memory size for executor?

In theory should be:

--executor-memory 20G

 

But I remember local mode will only start spark executor in the same process
of driver, then should be:

--driver-memory 20G

 

Regards,

 

Shuai



Buffering for Socket streams

2015-03-20 Thread jamborta
Hi all,

We are designing a workflow where we try to stream local files to a Socket
streamer, that would clean and process the files and write them to hdfs. We
have an issue with bigger files when the streamer cannot keep up with the
data, and runs out of memory. 

What would be the best way to implement an approach where the Socket stream
receiver would notify the stream not to send more data (stop reading from
disk too?), just before it might run out of memory?

thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Buffering-for-Socket-streams-tp22164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing AWS S3 in Frankfurt (v4 only - AWS4-HMAC-SHA256)

2015-03-20 Thread Ralf Heyde
=/'
 -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
 Fri, 20 Mar 2015 11:25:31 GMT, Authorization: AWS
 XXX_MY_KEY_XXX:XXX_I_GUESS_SECRET_XXX], Response Headers:
 [x-amz-request-id: 7E6F85873D69D14E, x-amz-id-2:
 rGFW+kRfURzz3DlY/m/M8h054MmHu8bxJAtKVHUmov/VY7pBXvtMvbQTXxA7bffpu4xxf4rGmL4=,
 x-amz-region: eu-central-1, Content-Type: application/xml,
 Transfer-Encoding: chunked, Date: Fri, 20 Mar 2015 11:25:31 GMT,
 Connection: close, Server: AmazonS3]
 15/03/20 11:25:32 WARN RestStorageService: Retrying request after
 automatic adjustment of Host endpoint from 
 frankfurt.ingestion.batch.s3.amazonaws.com to 
 frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com following
 request signing error using AWS request signing version 4: GET
 https://frankfurt.ingestion.batch.s3-eu-central-1.amazonaws.com:443/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/
 HTTP/1.1
 15/03/20 11:25:32 WARN RestStorageService: Retrying request following
 error response: GET
 '/?max-keys=1prefix=EAN/2015-03-09-72640385/input/HotelImageList.gz/delimiter=/'
 -- ResponseCode: 400, ResponseStatus: Bad Request, Request Headers: [Date:
 Fri, 20 Mar 2015 11:25:31 GMT, x-amz-content-sha256:
 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, Host:
 frankfurt.ingestion.batch.s3.amazonaws.com, x-amz-date:
 20150320T112531Z, Authorization: AWS4-HMAC-SHA256
 Credential=XXX_MY_KEY_XXX/20150320/us-east-1/s3/aws4_request,SignedHeaders=date;host;x-amz-content-sha256;x-amz-date,Signature=2098d3175c4304e44be912b770add7594d1d1b44f545c3025be1748672ec60e4],
 Response Headers: [x-amz-request-id: 5CABCD0D3046B267, x-amz-id-2:
 V65tW1lbSybbN3R3RMKBjJFz7xUgJDubSUm/XKXTypg7qfDtkSFRt2I9CMo2Qo2OAA+E44hiazg=,
 Content-Type: application/xml, Transfer-Encoding: chunked, Date: Fri, 20
 Mar 2015 11:25:32 GMT, Connection: close, Server: AmazonS3]
 Exception in thread main
 org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
 s3n://frankfurt.ingestion.batch/EAN/2015-03-09-72640385/input/HotelImageList.gz


 Do you have any Ideas? Was somebody of you already able to access S3 in
 Frankfurt, if so - how?

 Cheers Ralf






Re: version conflict common-net

2015-03-20 Thread Jacob Abraham
Anyone? or is this question nonsensical... and I am doing something
fundamentally wrong?



On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote:

 Hi Folks,

 I have a situation where I am getting a version conflict between java
 libraries that is used by my application and ones used by spark.

 Following are the details -

 I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net.

 In our spark application we use commons-net with version 3.3.

 However I found out that spark uses commons-net version 2.2.

 Hence when we try to submit our application using spark-submit, I end up
 getting, a NoSuchMethodError()

 ​
 Error starting receiver 5 -

 ​  ​
 java.lang.NoSuchMethodError: 
 org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V

   at ZipStream.onStart(ZipStream.java:55)
   at 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
   at 
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
   at 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
   at 
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)

 ​  .

   ​




 ​Now, if I change the commons-net version to 2.2, the job runs fine (expect 
 for the fact that some of the features we use from the commons-net 3.3 are 
 not there).



 ​How does one resolve such an issue where ​sparks uses one set of libraries 
 and our user application requires the same set of libraries, but just a 
 different version of it (In my case commons-net 2.2 vs 3.3).


 I see that there is a setting that I can supply - 
 spark.files.userClassPathFirst, but the documentation says that it is 
 experimental and for us this did not work at all.


 ​Thanks in advance.​


 Regards,

 -Jacob







Re: Visualizing Spark Streaming data

2015-03-20 Thread Irfan Ahmad
Grafana allows pretty slick interactive use patterns, especially with
graphite as the back-end. In a multi-user environment, why not have each
user just build their own independent dashboards and name them under some
simple naming convention?


*Irfan Ahmad*
CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com
Best of VMworld Finalist
Best Cloud Management Award
NetworkWorld 10 Startups to Watch
EMA Most Notable Vendor

On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan 
harut.martiros...@gmail.com wrote:

 Hey Jeffrey.
 Thanks for reply.

 I already have something similar, I use Grafana and Graphite, and for
 simple metric streaming we've got all set-up right.

 My question is about interactive patterns. For instance, dynamically
 choose an event to monitor, dynamically choose group-by field or any sort
 of filter, then view results. This is easy when you have 1 user, but if you
 have team of analysts all specifying their own criteria, it becomes hard to
 manage them all.

 On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com
 wrote:

 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a look
 at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore
 gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming from
 mobile devices.
 For example, I have event called add_photo, from which I want to
 calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which
 reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 RGRDZ Harut



Re: About the env of Spark1.2

2015-03-20 Thread Ted Yu
bq. Caused by: java.net.UnknownHostException: dhcp-10-35-14-100: Name or
service not known

Can you check your DNS ?

Cheers

On Fri, Mar 20, 2015 at 8:54 PM, tangzilu zilu.t...@hotmail.com wrote:

 Hi All:
 I recently started to deploy Spark1.2 in my VisualBox Linux.
 But when I run the command ./spark-shell in the path of
 /opt/spark-1.2.1/bin, I got the result like this:

 [root@dhcp-10-35-14-100 bin]# ./spark-shell
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root
 15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root
 15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication
 disabled; ui acls disabled; users with view permissions: Set(root); users
 with modify permissions: Set(root)
 15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server
 15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class
 server' on port 47691.
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.2.1
   /_/

 Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
 Type in expressions to have them evaluated.
 Type :help for more information.
 java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name
 or service not known
 at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
 at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710)
 at
 org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702)
 at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702)
 at org.apache.spark.HttpServer.uri(HttpServer.scala:158)
 at
 org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982)
 at $iwC$$iwC.init(console:9)
 at $iwC.init(console:18)
 at init(console:20)
 at .init(console:24)
 at .clinit(console)
 at .init(console:7)
 at .clinit(console)
 at $print(console)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
 at
 org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
 at
 org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
 at
 org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
 at
 org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
 at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
 at
 org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
 at
 org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
 at
 org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
 at
 org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
 at
 org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
 at
 org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
 at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
 at
 org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
 at
 org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
 at
 org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
 at
 scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
 at org.apache.spark.repl.Main$.main(Main.scala:31)
 at org.apache.spark.repl.Main.main(Main.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at 

Filesystem closed Exception

2015-03-20 Thread Sea
Hi, all:




When I exit the console of spark-sql, the following exception throwed..


My spark version is 1.3.0, hadoop version is 2.2.0


Exception in thread Thread-3 java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1388)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)‍

Filesystem closed Exception

2015-03-20 Thread Sea
Hi, all:




When I exit the console of spark-sql, the following exception throwed..


Exception in thread Thread-3 java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:629)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1677)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1106)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1102)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1102)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.spark.scheduler.EventLoggingListener.stop(EventLoggingListener.scala:196)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at 
org.apache.spark.SparkContext$$anonfun$stop$4.apply(SparkContext.scala:1388)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1388)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:66)
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$$anon$1.run(SparkSQLCLIDriver.scala:107)‍

About the env of Spark1.2

2015-03-20 Thread tangzilu
Hi All:
I recently started to deploy Spark1.2 in my VisualBox Linux.
But when I run the command ./spark-shell in the path of 
/opt/spark-1.2.1/bin, I got the result like this:

[root@dhcp-10-35-14-100 bin]# ./spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/03/20 13:56:06 INFO SecurityManager: Changing view acls to: root
15/03/20 13:56:06 INFO SecurityManager: Changing modify acls to: root
15/03/20 13:56:06 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
15/03/20 13:56:06 INFO HttpServer: Starting HTTP Server
15/03/20 13:56:06 INFO Utils: Successfully started service 'HTTP class server' 
on port 47691.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.1
  /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_75)
Type in expressions to have them evaluated.
Type :help for more information.
java.net.UnknownHostException: dhcp-10-35-14-100: dhcp-10-35-14-100: Name or 
service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1473)
at org.apache.spark.util.Utils$.findLocalIpAddress(Utils.scala:710)
at 
org.apache.spark.util.Utils$.localIpAddress$lzycompute(Utils.scala:702)
at org.apache.spark.util.Utils$.localIpAddress(Utils.scala:702)
at org.apache.spark.HttpServer.uri(HttpServer.scala:158)
at 
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:982)
at $iwC$$iwC.init(console:9)
at $iwC.init(console:18)
at init(console:20)
at .init(console:24)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:123)
at 
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:270)
at 
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:122)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:945)
at 
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:147)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:106)
at 
org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:60)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:962)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
at 
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: 

Re: Spark 1.2. loses often all executors

2015-03-20 Thread Akhil Das
Isn't that a feature? Other than running a buggy pipeline, just kills all
executors? You can always handle exceptions with proper try catch in your
code though.

Thanks
Best Regards

On Fri, Mar 20, 2015 at 3:51 PM, mrm ma...@skimlinks.com wrote:

 Hi,

 I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
 loses all executors whenever I have any Python code bug (like looking up a
 key in a dictionary that does not exist). In earlier versions, it would
 raise an exception but it would not lose all executors.

 Anybody with a similar problem?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Job History Server

2015-03-20 Thread Sean Owen
Uh, does that mean HDP shipped Marcelo's uncommitted patch from
SPARK-1537 anyway? Given the discussion there, that seems kinda
aggressive.

On Wed, Mar 18, 2015 at 8:49 AM, Marcelo Vanzin van...@cloudera.com wrote:
 Those classes are not part of standard Spark. You may want to contact
 Hortonworks directly if they're suggesting you use those.

 On Wed, Mar 18, 2015 at 3:30 AM, patcharee patcharee.thong...@uni.no wrote:
 Hi,

 I am using spark 1.3. I would like to use Spark Job History Server. I added
 the following line into conf/spark-defaults.conf

 spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
 spark.history.provider
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider
 spark.yarn.historyServer.address  sandbox.hortonworks.com:19888

 But got Exception in thread main java.lang.ClassNotFoundException:
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider

 What class is really needed? How to fix it?

 Br,
 Patcharee

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




 --
 Marcelo

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Eric Friedman
My job crashes with a bunch of these messages in the YARN logs.

What are the appropriate steps in troubleshooting?

15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while
beginning fetch of 10 outstanding blocks (after 3 retries)

15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
block(s) from host:port


Re: Spark 1.2. loses often all executors

2015-03-20 Thread Davies Liu
Maybe this is related to a bug in 1.2 [1], it's fixed in 1.2.2 (not
released), could checkout the 1.2 branch and verify that?

[1] https://issues.apache.org/jira/browse/SPARK-5788

On Fri, Mar 20, 2015 at 3:21 AM, mrm ma...@skimlinks.com wrote:
 Hi,

 I recently changed from Spark 1.1. to Spark 1.2., and I noticed that it
 loses all executors whenever I have any Python code bug (like looking up a
 key in a dictionary that does not exist). In earlier versions, it would
 raise an exception but it would not lose all executors.

 Anybody with a similar problem?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-2-loses-often-all-executors-tp22162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-submit and multiple files

2015-03-20 Thread Davies Liu
You MUST put --py-files BEFORE main.py, as mentioned in another threads.

On Fri, Mar 20, 2015 at 1:47 AM, Guillaume Charhon
guilla...@databerries.com wrote:
 Hi Davies,

 I am already using --py-files. The system does use the other file. The error
 I am getting is not trivial. Please check the error log.



 On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com wrote:

 You could submit additional Python source via --py-files , for example:

 $ bin/spark-submit --py-files work.py main.py

 On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez guilla...@databerries.com
 wrote:
  Hello guys,
 
  I am having a hard time to understand how spark-submit behave with
  multiple
  files. I have created two code snippets. Each code snippet is composed
  of a
  main.py and work.py. The code works if I paste work.py then main.py in a
  pyspark shell. However both snippets do not work when using spark submit
  and
  generate different errors.
 
  Function add_1 definition outside
  http://www.codeshare.io/4ao8B
  https://justpaste.it/jzvj
 
  Embedded add_1 function definition
  http://www.codeshare.io/OQJxq
  https://justpaste.it/jzvn
 
  I am trying a way to make it work.
 
  Thank you for your support.
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yin Huai
spark.sql.shuffle.partitions only control the number of tasks in the second
stage (the number of reducers). For your case, I'd say that the number of
tasks in the first state (number of mappers) will be the number of files
you have.

Actually, have you changed spark.executor.memory (it controls the memory
for an executor of your application)? I did not see it in your original
email. The difference between worker memory and executor memory can be
found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html),

SPARK_WORKER_MEMORY
Total amount of memory to allow Spark applications to use on the machine,
e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
application's individual memory is configured using its
spark.executor.memory property.


On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com
wrote:

 Actually I realized that the correct way is:

 sqlContext.sql(set spark.sql.shuffle.partitions=1000)

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get again
 the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
 a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot











RE: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread java8964
Hi, Imran:
Thanks for your information.
I found a benchmark online about serialization which compares Java vs Kryo vs 
gridgain at here: 
http://gridgain.blogspot.com/2012/12/java-serialization-good-fast-and-faster.html
From my test result, in the above benchmark case for the SimpleObject, Kryo is 
slightly faster than Java serialization, but only use half of the space vs 
Java serialization.
So now I understand more about what kind of benefits I should expect from using 
KryoSerializer.
But I have some questions related to Spark SQL. If I use Spark SQL, should I 
expect less memory usage? I mean in Spark SQL, everything is controlled by 
Spark. If I pass in 
-Dspark.serializer=org.apache.spark.serializer.KryoSerializer and save the 
table in Cache, so it will use much less memory? Do I also need to specify 
StorageLevel.MEMORY_ONLY_SER if I want to use less memory? Where I can set 
that in Spark SQL?
Thanks
Yong

From: iras...@cloudera.com
Date: Fri, 20 Mar 2015 11:54:38 -0500
Subject: Re: Why I didn't see the benefits of using KryoSerializer
To: java8...@hotmail.com
CC: user@spark.apache.org

Hi Yong,
yes I think your analysis is correct.  I'd imagine almost all serializers out 
there will just convert a string to its utf-8 representation.  You might be 
interested in adding compression on top of a serializer, which would probably 
bring the string size down in almost all cases, but then you also need to take 
the time for compression.  Kryo is generally more efficient than the java 
serializer on complicated object types.
I guess I'm still a little surprised that kryo is slower than java 
serialization for you.  You might try setting spark.kryo.referenceTracking to 
false if you are just serializing objects with no circular references.  I think 
that will improve the performance a little, though I dunno how much.
It might be worth running your experiments again with slightly more complicated 
objects and see what you observe.
Imran

On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote:



I read the Spark code a little bit, trying to understand my own question.
It looks like the different is really between 
org.apache.spark.serializer.JavaSerializer and 
org.apache.spark.serializer.KryoSerializer, both having the method named 
writeObject.
In my test case, for each line of my text file, it is about 140 bytes of 
String. When either JavaSerializer.writeObject(140 bytes of String) or 
KryoSerializer.writeObject(140 bytes of String), I didn't see difference in the 
underline OutputStream space usage.
Does this mean that KryoSerializer really doesn't give us any benefit for 
String type? I understand that for primitives types, it shouldn't have any 
benefits, but how about String type?
When we talk about lower the memory using KryoSerializer in spark, under what 
case it can bring significant benefits? It is my first experience with the 
KryoSerializer, so maybe I am total wrong about its usage.
Thanks
Yong 
From: java8...@hotmail.com
To: user@spark.apache.org
Subject: Why I didn't see the benefits of using KryoSerializer
Date: Tue, 17 Mar 2015 12:01:35 -0400




Hi, I am new to Spark. I tried to understand the memory benefits of using 
KryoSerializer.
I have this one box standalone test environment, which is 24 cores with 24G 
memory. I installed Hadoop 2.2 plus Spark 1.2.0.
I put one text file in the hdfs about 1.2G.  Here is the settings in the 
spark-env.sh
export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4export 
SPARK_WORKER_MEMORY=32gexport SPARK_DRIVER_MEMORY=2gexport 
SPARK_EXECUTOR_MEMORY=4g
First test case:val 
log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)log.count()log.count()
The data is about 3M rows. For the first test case, from the storage in the web 
UI, I can see Size in Memory is 1787M, and Fraction Cached is 70% with 7 
cached partitions.This matched with what I thought, and first count finished 
about 17s, and 2nd count finished about 6s.
2nd test case after restart the spark-shell:val 
log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count()
Now from the web UI, I can see Size in Memory is 1231M, and Fraction Cached 
is 100% with 10 cached partitions. It looks like caching the default java 
serialized format reduce the memory usage, but coming with a cost that first 
count finished around 39s and 2nd count finished around 9s. So the job runs 
slower, with less memory usage.
So far I can understand all what happened and the tradeoff.
Now the problem comes with when I tried to test with KryoSerializer
SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer 
/opt/spark/bin/spark-shellval 
log=sc.textFile(hdfs://namenode:9000/test_1g/)log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)log.count()log.count()
First, I saw that the new serializer setting passed in, as proven in the Spark 

RE: Spark will process _temporary folder on S3 is very slow and always cause failure

2015-03-20 Thread Shuai Zheng
Thanks!

 

Let me update the status.

 

I have copied the DirectOutputCommitter to my local. And set:

 

Conf.set(spark.hadoop.mapred.output.committer.class, 
org..DirectOutputCommitter)

 

It works perfectly.

 

Thanks  everyone J

 

Regards,

 

Shuai

 

From: Aaron Davidson [mailto:ilike...@gmail.com] 
Sent: Tuesday, March 17, 2015 3:06 PM
To: Imran Rashid
Cc: Shuai Zheng; user@spark.apache.org
Subject: Re: Spark will process _temporary folder on S3 is very slow and always 
cause failure

 

Actually, this is the more relevant JIRA (which is resolved):

https://issues.apache.org/jira/browse/SPARK-3595

 

6352 is about saveAsParquetFile, which is not in use here.

 

Here is a DirectOutputCommitter implementation:

https://gist.github.com/aarondav/c513916e72101bbe14ec

 

and it can be configured in Spark with:

sparkConf.set(spark.hadoop.mapred.output.committer.class, 
classOf[DirectOutputCommitter].getName)

 

On Tue, Mar 17, 2015 at 8:05 AM, Imran Rashid iras...@cloudera.com wrote:

I'm not super familiar w/ S3, but I think the issue is that you want to use a 
different output committers with object stores, that don't have a simple move 
operation.  There have been a few other threads on S3  outputcommitters.  I 
think the most relevant for you is most probably this open JIRA:

 

https://issues.apache.org/jira/browse/SPARK-6352

 

On Fri, Mar 13, 2015 at 5:51 PM, Shuai Zheng szheng.c...@gmail.com wrote:

Hi All,

 

I try to run a sorting on a r3.2xlarge instance on AWS. I just try to run it as 
a single node cluster for test. The data I use to sort is around 4GB and sit on 
S3, output will also on S3.

 

I just connect spark-shell to the local cluster and run the code in the script 
(because I just want a benchmark now).

 

My job is as simple as:

val parquetFile = 
sqlContext.parquetFile(s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,s3n://...,)

parquetFile.registerTempTable(Test)

val sortedResult = sqlContext.sql(SELECT * FROM Test order by time).map { row 
= { row.mkString(\t) } }

sortedResult.saveAsTextFile(s3n://myplace,);

 

The job takes around 6 mins to finish the sort when I am monitoring the 
process. After I notice the process stop at: 

 

15/03/13 22:38:27 INFO DAGScheduler: Job 2 finished: saveAsTextFile at 
console:31, took 581.304992 s

 

At that time, the spark actually just write all the data to the _temporary 
folder first, after all sub-tasks finished, it will try to move all the ready 
result from _temporary folder to the final location. This process might be 
quick locally (because it will just be a cut/paste), but it looks like very 
slow on my S3, it takes a few second to move one file (usually there will be 
200 partitions). And then it raise exceptions after it move might be 40-50 
files.

 

org.apache.http.NoHttpResponseException: The target server failed to respond

at 
org.apache.http.impl.conn.DefaultResponseParser.parseHead(DefaultResponseParser.java:101)

at 
org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:252)

at 
org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:281)

at 
org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:247)

at 
org.apache.http.impl.conn.AbstractClientConnAdapter.receiveResponseHeader(AbstractClientConnAdapter.java:219)

 



 

I try several times, but never get the full job finished. I am not sure 
anything wrong here, but I use something very basic and I can see the job has 
finished and all result on the S3 under temporary folder, but then it raise the 
exception and fail. 

 

Any special setting I should do here when deal with S3?

 

I don’t know what is the issue here, I never see MapReduce has similar issue. 
So it could not be S3’s problem.

 

Regards,

 

Shuai

 

 



Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Sean Owen
Hm is data locality a factor here? I don't know.

Just a side note: this doesn't cause OOM errors per se since the cache
won't exceed the % of heap it's allowed. However that will hasten OOM
problems due to tasks using too much memory, of course. The solution
is to get more memory to the tasks or reduce their working set size.

On Fri, Mar 20, 2015 at 12:32 PM, Alessandro Lulli lu...@di.unipi.it wrote:
 Hi All,

 I'm experiencing the same issue with Spark 120 (not verified with previous).

 Could you please help us on this?

 Thanks
 Alessandro

 On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote:

 Hi I'm running a standalone cluster with 8 worker servers.
 I'm developing a streaming app that is adding new lines of text to several
 different RDDs each batch interval. Each line has a well randomized unique
 identifier that I'm trying to use for partitioning, since the data stream
 does contain duplicates lines. I'm doing partitioning with this:

 val eventsByKey =  streamRDD.map { event = (getUID(event), event)}
 val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
.partitionBy(new HashPartitioner(numPartions)).map(e = e._2)

 I'm adding to the existing RDD like with this:

 val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
 (currentIter,batchIter) =
 val uniqEvents = ListBuffer[String]()
 val uids = Map[String,Boolean]()
 Array(currentIter, batchIter).foreach { iter =
   iter.foreach { event =
 val uid = getUID(event)
 if (!uids.contains(uid)) {
 uids(uid) = true
 uniqEvents +=event
 }
   }
 }
 uniqEvents.iterator
 }

 val count = mergedRDD.count

 The reason I'm doing it this way is that when I was doing:

 val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
 val count = mergedRDD.count

 It would start taking a long time and a lot of shuffles.

 The zipPartitions approach does perform better, though after running an
 hour
 or so I start seeing this
 in the webUI.


 http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png

 As you can see most of the data is skewing to just 2 executors, with 1
 getting more than half the Blocks. These become a hotspot and eventually I
 start seeing OOM errors. I've tried this a half a dozen times and the
 'hot'
 executors changes, but not the skewing behavior.

 Any idea what is going on here?

 Thanks,

 Mike




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



can distinct transform applied on DStream?

2015-03-20 Thread Darren Hoo
val aDstream = ...

val distinctStream = aDstream.transform(_.distinct())

but the elements in distinctStream  are not distinct.

Did I use it wrong?


Re: ShuffleBlockFetcherIterator: Failed to get block(s)

2015-03-20 Thread Imran Rashid
I think you should see some other errors before that, from
NettyBlockTransferService, with a msg like Exception while beginning
fetchBlocks.  There might be a bit more information there.  there are an
assortment of possible causes, but first lets just make sure you have all
the details from the original cause.

On Fri, Mar 20, 2015 at 8:49 AM, Eric Friedman eric.d.fried...@gmail.com
wrote:

 My job crashes with a bunch of these messages in the YARN logs.

 What are the appropriate steps in troubleshooting?

 15/03/19 23:29:45 ERROR shuffle.RetryingBlockFetcher: Exception while
 beginning fetch of 10 outstanding blocks (after 3 retries)

 15/03/19 23:29:45 ERROR storage.ShuffleBlockFetcherIterator: Failed to get
 block(s) from host:port



Re: RDD Blocks skewing to just few executors

2015-03-20 Thread Alessandro Lulli
Hi All,

I'm experiencing the same issue with Spark 120 (not verified with previous).

Could you please help us on this?

Thanks
Alessandro

On Tue, Nov 18, 2014 at 1:40 AM, mtimper mich...@timper.com wrote:

 Hi I'm running a standalone cluster with 8 worker servers.
 I'm developing a streaming app that is adding new lines of text to several
 different RDDs each batch interval. Each line has a well randomized unique
 identifier that I'm trying to use for partitioning, since the data stream
 does contain duplicates lines. I'm doing partitioning with this:

 val eventsByKey =  streamRDD.map { event = (getUID(event), event)}
 val partionedEventsRdd = sparkContext.parallelize(eventsByKey.toSeq)
.partitionBy(new HashPartitioner(numPartions)).map(e = e._2)

 I'm adding to the existing RDD like with this:

 val mergedRDD = currentRDD.zipPartitions(partionedEventsRdd, true) {
 (currentIter,batchIter) =
 val uniqEvents = ListBuffer[String]()
 val uids = Map[String,Boolean]()
 Array(currentIter, batchIter).foreach { iter =
   iter.foreach { event =
 val uid = getUID(event)
 if (!uids.contains(uid)) {
 uids(uid) = true
 uniqEvents +=event
 }
   }
 }
 uniqEvents.iterator
 }

 val count = mergedRDD.count

 The reason I'm doing it this way is that when I was doing:

 val mergedRDD = currentRDD.union(batchRDD).coalesce(numPartions).distinct
 val count = mergedRDD.count

 It would start taking a long time and a lot of shuffles.

 The zipPartitions approach does perform better, though after running an
 hour
 or so I start seeing this
 in the webUI.

 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n19112/Executors.png
 

 As you can see most of the data is skewing to just 2 executors, with 1
 getting more than half the Blocks. These become a hotspot and eventually I
 start seeing OOM errors. I've tried this a half a dozen times and the 'hot'
 executors changes, but not the skewing behavior.

 Any idea what is going on here?

 Thanks,

 Mike




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Blocks-skewing-to-just-few-executors-tp19112.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Job History Server

2015-03-20 Thread Zhan Zhang
Hi Patcharee,

It is an alpha feature in HDP distribution, integrating ATS with Spark history 
server. If you are using upstream, you can configure spark as regular without 
these configuration. But other related configuration are still mandatory, such 
as hdp.version related.

Thanks.

Zhan Zhang
 
On Mar 18, 2015, at 3:30 AM, patcharee patcharee.thong...@uni.no wrote:

 Hi,
 
 I am using spark 1.3. I would like to use Spark Job History Server. I added 
 the following line into conf/spark-defaults.conf
 
 spark.yarn.services org.apache.spark.deploy.yarn.history.YarnHistoryService
 spark.history.provider 
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider
 spark.yarn.historyServer.address  sandbox.hortonworks.com:19888
 
 But got Exception in thread main java.lang.ClassNotFoundException: 
 org.apache.spark.deploy.yarn.history.YarnHistoryProvider
 
 What class is really needed? How to fix it?
 
 Br,
 Patcharee
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Visualizing Spark Streaming data

2015-03-20 Thread Harut Martirosyan
But it requires all possible combinations of your filters as separate
metrics, moreover, it only can show time based information, you cannot
group by say country.

On 20 March 2015 at 19:09, Irfan Ahmad ir...@cloudphysics.com wrote:

 Grafana allows pretty slick interactive use patterns, especially with
 graphite as the back-end. In a multi-user environment, why not have each
 user just build their own independent dashboards and name them under some
 simple naming convention?


 *Irfan Ahmad*
 CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com
 Best of VMworld Finalist
 Best Cloud Management Award
 NetworkWorld 10 Startups to Watch
 EMA Most Notable Vendor

 On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan 
 harut.martiros...@gmail.com wrote:

 Hey Jeffrey.
 Thanks for reply.

 I already have something similar, I use Grafana and Graphite, and for
 simple metric streaming we've got all set-up right.

 My question is about interactive patterns. For instance, dynamically
 choose an event to monitor, dynamically choose group-by field or any sort
 of filter, then view results. This is easy when you have 1 user, but if you
 have team of analysts all specifying their own criteria, it becomes hard to
 manage them all.

 On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com
 wrote:

 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a
 look at Elasticsearch+Kibana. It's very easy to set up, powerful and
 therefore gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming
 from
 mobile devices.
 For example, I have event called add_photo, from which I want to
 calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which
 reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 RGRDZ Harut





-- 
RGRDZ Harut


Re: version conflict common-net

2015-03-20 Thread Sean Owen
It's not a crazy question, no. I'm having a bit of trouble figuring
out what's happening. Commons Net 2.2 is what's used by Spark. The
error appears to come from Spark. But the error is not finding a
method that did not exist in 2.2. I am not sure what ZipStream is, for
example. This could be a bizarre situation where classloader rules
mean that part of 2.2 and part of 3.3 are being used. For example,
let's say:

- your receiver uses 3.3 classes that are only in 3.3, so they are
found in your user classloader
- 3.3 classes call some class that also existed in 2.2, but those are
found in the Spark classloader.
- 2.2 class doesn't have methods that 3.3 expects

userClassPathFirst is often a remedy. There are several versions of
this flag though. For example you need a different one if on YARN to
have it take effect.

It's worth ruling that out first. If all else fails you can shade 3.3.

On Fri, Mar 20, 2015 at 11:44 AM, Jacob Abraham abe.jac...@gmail.com wrote:
 Anyone? or is this question nonsensical... and I am doing something
 fundamentally wrong?



 On Mon, Mar 16, 2015 at 5:33 PM, Jacob Abraham abe.jac...@gmail.com wrote:

 Hi Folks,

 I have a situation where I am getting a version conflict between java
 libraries that is used by my application and ones used by spark.

 Following are the details -

 I use spark provided by Cloudera running on the CDH5.3.2 cluster (Spark
 1.2.0-cdh5.3.2). The library that is causing the conflict is commons-net.

 In our spark application we use commons-net with version 3.3.

 However I found out that spark uses commons-net version 2.2.

 Hence when we try to submit our application using spark-submit, I end up
 getting, a NoSuchMethodError()

 Error starting receiver 5 -


 java.lang.NoSuchMethodError:
 org.apache.commons.net.ftp.FTPClient.setAutodetectUTF8(Z)V

  at ZipStream.onStart(ZipStream.java:55)
  at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
  at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
  at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
  at
 org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)

   .






 Now, if I change the commons-net version to 2.2, the job runs fine (expect
 for the fact that some of the features we use from the commons-net 3.3 are
 not there).



 How does one resolve such an issue where sparks uses one set of libraries
 and our user application requires the same set of libraries, but just a
 different version of it (In my case commons-net 2.2 vs 3.3).


 I see that there is a setting that I can supply -
 spark.files.userClassPathFirst, but the documentation says that it is
 experimental and for us this did not work at all.


 Thanks in advance.


 Regards,

 -Jacob






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Hi Yin,

the way I set the configuration is:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf(spark.sql.shuffle.partitions,1000);

it is the correct way right?
In the mapPartitions task (the first task which is launched), I get again
the same number of tasks and again the same error. :(

Thanks a lot!

On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com
  wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot









Re: Error communicating with MapOutputTracker

2015-03-20 Thread Imran Rashid
Hi Thomas,

sorry for such a late reply.  I don't have any super-useful advice, but
this seems like something that is important to follow up on.  to answer
your immediate question, No, there should not be any hard limit to the
number of tasks that MapOutputTracker can handle.  Though of course as
things get bigger, the overheads increase which is why you might hit
timeouts.

Two other minor suggestions:
(1) increase spark.akka.askTimeout -- thats the timeout you are running
into, it defaults to 30 seconds
(2) as you've noted, you've needed to play w/ other timeouts b/c of long GC
pauses -- its possible some GC tuning might help, though its a bit of a
black art so its hard to say what you can try.  You cold always try
Concurrent Mark Swee to avoid the long pauses, but of course that will
probably hurt overall performance.

can you share any more details of what you are trying to do?

Since you're fetching shuffle blocks in a shuffle map task, I guess you've
got two shuffles back-to-back, eg.
someRDD.reduceByKey{...}.map{...}.filter{...}.combineByKey{...}.  Do you
expect to be doing a lot of GC in between the two shuffles?? -eg., in the
little example I have, if there were lots of objects being created in the
map  filter steps that will make it out of the eden space.  One possible
solution to this would be to force the first shuffle to complete, before
running any of the subsequent transformations, eg. by forcing
materialization to the cache first

val intermediateRDD = someRDD.reduceByKey{...}.persist(DISK)
intermediateRDD.count() // force the shuffle to complete, without trying to
do our complicated downstream logic at the same time

val finalResult = intermediateRDD.map{...}.filter{...}.combineByKey{...}

Also, can you share your data size?  Do you expect the shuffle to be
skewed, or do you think it will be well-balanced?  Not that I'll have any
suggestions for you based on the answer, but it may help us reproduce it
and try to fix whatever the root cause is.

thanks,
Imran



On Wed, Mar 4, 2015 at 12:30 PM, Thomas Gerber thomas.ger...@radius.com
wrote:

 I meant spark.default.parallelism of course.

 On Wed, Mar 4, 2015 at 10:24 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Follow up:
 We re-retried, this time after *decreasing* spark.parallelism. It was set
 to 16000 before, (5 times the number of cores in our cluster). It is now
 down to 6400 (2 times the number of cores).

 And it got past the point where it failed before.

 Does the MapOutputTracker have a limit on the number of tasks it can
 track?


 On Wed, Mar 4, 2015 at 8:15 AM, Thomas Gerber thomas.ger...@radius.com
 wrote:

 Hello,

 We are using spark 1.2.1 on a very large cluster (100 c3.8xlarge
 workers). We use spark-submit to start an application.

 We got the following error which leads to a failed stage:

 Job aborted due to stage failure: Task 3095 in stage 140.0 failed 4 times, 
 most recent failure: Lost task 3095.3 in stage 140.0 (TID 308697, 
 ip-10-0-12-88.ec2.internal): org.apache.spark.SparkException: Error 
 communicating with MapOutputTracker


 We tried the whole application again, and it failed on the same stage
 (but it got more tasks completed on that stage) with the same error.

 We then looked at executors stderr, and all show similar logs, on both
 runs (see below). As far as we can tell, executors and master have disk
 space left.

 *Any suggestion on where to look to understand why the communication
 with the MapOutputTracker fails?*

 Thanks
 Thomas
 
 In case it matters, our akka settings:
 spark.akka.frameSize 50
 spark.akka.threads 8
 // those below are 10* the default, to cope with large GCs
 spark.akka.timeout 1000
 spark.akka.heartbeat.pauses 6
 spark.akka.failure-detector.threshold 3000.0
 spark.akka.heartbeat.interval 1

 Appendix: executor logs, where it starts going awry

 15/03/04 11:45:00 INFO CoarseGrainedExecutorBackend: Got assigned task 
 298525
 15/03/04 11:45:00 INFO Executor: Running task 3083.0 in stage 140.0 (TID 
 298525)
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(1473) called with 
 curMem=5543008799, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339_piece0 stored as 
 bytes in memory (estimated size 1473.0 B, free 11.7 GB)
 15/03/04 11:45:00 INFO BlockManagerMaster: Updated info of block 
 broadcast_339_piece0
 15/03/04 11:45:00 INFO TorrentBroadcast: Reading broadcast variable 339 
 took 224 ms
 15/03/04 11:45:00 INFO MemoryStore: ensureFreeSpace(2536) called with 
 curMem=5543010272, maxMem=18127202549
 15/03/04 11:45:00 INFO MemoryStore: Block broadcast_339 stored as values in 
 memory (estimated size 2.5 KB, free 11.7 GB)
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Don't have map outputs for 
 shuffle 18, fetching them
 15/03/04 11:45:00 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
 actor = 
 Actor[akka.tcp://sparkDriver@ip-10-0-10-17.ec2.internal:52380/user/MapOutputTracker#-2057016370]
 15/03/04 11:45:00 INFO 

Re: FetchFailedException: Adjusted frame length exceeds 2147483647: 12716268407 - discarded

2015-03-20 Thread Imran Rashid
I think you are running into a combo of

https://issues.apache.org/jira/browse/SPARK-5928
and
https://issues.apache.org/jira/browse/SPARK-5945

The standard solution is to just increase the number of partitions you are
creating. textFile(), reduceByKey(), and sortByKey() all take an optional
second argument, where you can specify the number of partitions you use.
It looks its using spark.default.parallelism right now, which will be the
number of cores in your cluster usually (not sure what that is in your
case).  The exception you gave shows your about 6x over the limit in at
least this one case, so I'd start by with at least 10x the number of
partitions you have now, and increase until it works (or you run into some
other problem from too many partitions ...)

I'd also strongly suggest doing the filter before you do the sortByKey --
no reason to force all that data if you're going to through a lot of it
away.  Its not completely clear where you are hitting the error now -- that
alone. might even solve your problem.

hope this helps,
Imran


On Thu, Mar 19, 2015 at 5:28 PM, roni roni.epi...@gmail.com wrote:

 I get 2 types of error -
 -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0 and
 FetchFailedException: Adjusted frame length exceeds 2147483647:
 12716268407 - discarded

 Spar keeps re-trying to submit the code and keeps getting this error.

 My file on which I am finding  the sliding window strings is 500 MB  and I
 am doing it with length = 150.
 It woks fine till length is 100.

 This is my code -
  val hgfasta = sc.textFile(args(0)) // read the fasta file
 val kCount = hgfasta.flatMap(r = { r.sliding(args(2).toInt) })
 val kmerCount = kCount.map(x = (x, 1)).reduceByKey(_ + _).map { case
 (x, y) = (y, x) }.sortByKey(false).map { case (i, j) = (j, i) }

   val filtered = kmerCount.filter(kv = kv._2  5)
   filtered.map(kv = kv._1 + ,  +
 kv._2.toLong).saveAsTextFile(args(1))

   }
 It gets stuck and flat map and save as Text file  Throws
 -org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0 and

 org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
 2147483647: 12716268407 - discarded
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
   at 
 org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)





Re: Visualizing Spark Streaming data

2015-03-20 Thread Roger Hoover
Hi Harut,

Jeff's right that Kibana + Elasticsearch can take you quite far out of the
box.  Depending on your volume of data, you may only be able to keep recent
data around though.

Another option that is custom-built for handling many dimensions at query
time (not as separate metrics) is Druid (http://druid.io/).  It supports
the Lambda architecture.  It does real-time indexing from Kafka and after a
configurable window, hands off shards to historical nodes.  The historical
shards can also be recomputed in batch mode to fixed up duplicates or late
data.

I wrote a plugin for Grafana that talks to Druid.  It doesn't support all
of Druid's rich query API but it can get you pretty far.

https://github.com/Quantiply/grafana-plugins/

Cheers,

Roger



On Fri, Mar 20, 2015 at 9:11 AM, Harut Martirosyan 
harut.martiros...@gmail.com wrote:

 But it requires all possible combinations of your filters as separate
 metrics, moreover, it only can show time based information, you cannot
 group by say country.

 On 20 March 2015 at 19:09, Irfan Ahmad ir...@cloudphysics.com wrote:

 Grafana allows pretty slick interactive use patterns, especially with
 graphite as the back-end. In a multi-user environment, why not have each
 user just build their own independent dashboards and name them under some
 simple naming convention?


 *Irfan Ahmad*
 CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com
 Best of VMworld Finalist
 Best Cloud Management Award
 NetworkWorld 10 Startups to Watch
 EMA Most Notable Vendor

 On Fri, Mar 20, 2015 at 1:06 AM, Harut Martirosyan 
 harut.martiros...@gmail.com wrote:

 Hey Jeffrey.
 Thanks for reply.

 I already have something similar, I use Grafana and Graphite, and for
 simple metric streaming we've got all set-up right.

 My question is about interactive patterns. For instance, dynamically
 choose an event to monitor, dynamically choose group-by field or any sort
 of filter, then view results. This is easy when you have 1 user, but if you
 have team of analysts all specifying their own criteria, it becomes hard to
 manage them all.

 On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com
 wrote:

 Hey Harut,
 I don't think there'll by any general practices as this part heavily
 depends on your environment, skills and what you want to achieve.

 If you don't have a general direction yet, I'd suggest you to have a
 look at Elasticsearch+Kibana. It's very easy to set up, powerful and
 therefore gets a lot of traction currently.

 Regards,
 Jeff

 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com:

 I'm trying to build a dashboard to visualize stream of events coming
 from
 mobile devices.
 For example, I have event called add_photo, from which I want to
 calculate
 trending tags for added photos for last x minutes. Then I'd like to
 aggregate that by country, etc. I've built the streaming part, which
 reads
 from Kafka, and calculates needed results and get appropriate RDDs, the
 question is now how to connect it to UI.

 Is there any general practices on how to pass parameters to spark from
 some
 custom built UI, how to organize data retrieval, what intermediate
 storages
 to use, etc.

 Thanks in advance.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





 --
 RGRDZ Harut





 --
 RGRDZ Harut



Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Actually I realized that the correct way is:

sqlContext.sql(set spark.sql.shuffle.partitions=1000)

but I am still experiencing the same behavior/error.

On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get again
 the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
 a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot










Re: Why I didn't see the benefits of using KryoSerializer

2015-03-20 Thread Imran Rashid
Hi Yong,

yes I think your analysis is correct.  I'd imagine almost all serializers
out there will just convert a string to its utf-8 representation.  You
might be interested in adding compression on top of a serializer, which
would probably bring the string size down in almost all cases, but then you
also need to take the time for compression.  Kryo is generally more
efficient than the java serializer on complicated object types.

I guess I'm still a little surprised that kryo is slower than java
serialization for you.  You might try setting
spark.kryo.referenceTracking to false if you are just serializing objects
with no circular references.  I think that will improve the performance a
little, though I dunno how much.

It might be worth running your experiments again with slightly more
complicated objects and see what you observe.

Imran


On Thu, Mar 19, 2015 at 12:57 PM, java8964 java8...@hotmail.com wrote:

 I read the Spark code a little bit, trying to understand my own question.

 It looks like the different is really between
 org.apache.spark.serializer.JavaSerializer and
 org.apache.spark.serializer.KryoSerializer, both having the method named
 writeObject.

 In my test case, for each line of my text file, it is about 140 bytes of
 String. When either JavaSerializer.writeObject(140 bytes of String) or
 KryoSerializer.writeObject(140 bytes of String), I didn't see difference in
 the underline OutputStream space usage.

 Does this mean that KryoSerializer really doesn't give us any benefit for
 String type? I understand that for primitives types, it shouldn't have any
 benefits, but how about String type?

 When we talk about lower the memory using KryoSerializer in spark, under
 what case it can bring significant benefits? It is my first experience with
 the KryoSerializer, so maybe I am total wrong about its usage.

 Thanks

 Yong

 --
 From: java8...@hotmail.com
 To: user@spark.apache.org
 Subject: Why I didn't see the benefits of using KryoSerializer
 Date: Tue, 17 Mar 2015 12:01:35 -0400


 Hi, I am new to Spark. I tried to understand the memory benefits of using
 KryoSerializer.

 I have this one box standalone test environment, which is 24 cores with
 24G memory. I installed Hadoop 2.2 plus Spark 1.2.0.

 I put one text file in the hdfs about 1.2G.  Here is the settings in the
 spark-env.sh

 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4
 export SPARK_WORKER_MEMORY=32g
 export SPARK_DRIVER_MEMORY=2g
 export SPARK_EXECUTOR_MEMORY=4g

 First test case:
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
 log.count()
 log.count()

 The data is about 3M rows. For the first test case, from the storage in
 the web UI, I can see Size in Memory is 1787M, and Fraction Cached is
 70% with 7 cached partitions.
 This matched with what I thought, and first count finished about 17s, and
 2nd count finished about 6s.

 2nd test case after restart the spark-shell:
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 log.count()
 log.count()

 Now from the web UI, I can see Size in Memory is 1231M, and Fraction
 Cached is 100% with 10 cached partitions. It looks like caching the
 default java serialized format reduce the memory usage, but coming with a
 cost that first count finished around 39s and 2nd count finished around 9s.
 So the job runs slower, with less memory usage.

 So far I can understand all what happened and the tradeoff.

 Now the problem comes with when I tried to test with KryoSerializer

 SPARK_JAVA_OPTS=-Dspark.serializer=org.apache.spark.serializer.KryoSerializer
 /opt/spark/bin/spark-shell
 val log=sc.textFile(hdfs://namenode:9000/test_1g/)
 log.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
 log.count()
 log.count()

 First, I saw that the new serializer setting passed in, as proven in the
 Spark Properties of Environment shows 

 spark.driver.extraJavaOptions

   -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
   . This is not there for first 2 test cases.
 But in the web UI of Storage, the Size in Memory is 1234M, with 100%
 Fraction Cached and 10 cached partitions. The first count took 46s and
 2nd count took 23s.

 I don't get much less memory size as I expected, but longer run time for
 both counts. Anything I did wrong? Why the memory foot print of 
 MEMORY_ONLY_SER
 for KryoSerializer still use the same size as default Java serializer, with
 worse duration?

 Thanks

 Yong



RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Below is the output:

 

core file size  (blocks, -c) 0

data seg size   (kbytes, -d) unlimited

scheduling priority (-e) 0

file size   (blocks, -f) unlimited

pending signals (-i) 1967947

max locked memory   (kbytes, -l) 64

max memory size (kbytes, -m) unlimited

open files  (-n) 2024

pipe size(512 bytes, -p) 8

POSIX message queues (bytes, -q) 819200

real-time priority  (-r) 0

stack size  (kbytes, -s) 8192

cpu time   (seconds, -t) unlimited

max user processes  (-u) 1967947

virtual memory  (kbytes, -v) unlimited

file locks  (-x) unlimited

 

I have set the max open file to 2024 by ulimit -n 2024, but same issue

I am not sure whether it is a reasonable setting.

 

Actually I am doing a loop, each time try to sort only 3GB data, it runs
very quick in first loop, and slow down in second loop. At each time loop I
start and destroy the context (because I want to clean up the temp file
create under tmp folder, which take a lot of space). Just default setting.

 

My logic:

 

For loop:

Val sc = new sc

Sql = sc.loadParquet

Sortbykey

Sc.stop

End

 

And I run on the EC2 c3*8xlarge, Amazon Linux AMI 2014.09.2 (HVM).

 

From: java8964 [mailto:java8...@hotmail.com] 
Sent: Friday, March 20, 2015 3:54 PM
To: user@spark.apache.org
Subject: RE: com.esotericsoftware.kryo.KryoException: java.io.IOException:
File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

 

Do you think the ulimit for the user running Spark on your nodes?

 

Can you run ulimit -a under the user who is running spark on the executor
node? Does the result make sense for the data you are trying to process?

 

Yong

 

  _  

From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File
too large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set(spark.shuffle.consolidateFiles, true)

.set(spark.shuffle.manager, SORT)

 

Then I get the error:

 

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



Re: Spark SQL UDT Kryo serialization, Unable to find class

2015-03-20 Thread Michael Armbrust
You probably don't cause a shuffle (which requires serialization) unless
there is a join or group by.

It's possible that we are need to pass the spark class loader to kryo when
creating a new instance (you can get it from Utils I believe).  We never
run Otto this problem since this API is not public yet.  I'd start by
looking in SparkSqlSerializer.
On Mar 18, 2015 1:13 AM, Zia Ur Rehman Kayani zia.kay...@platalytics.com
wrote:

 Thanks for your reply. I've tried this as well, by passing the JAR file
 path to *spark.executor.extraClassPath *but it doesn't help me out,
 actually I've figured it out that custom UDT works fine if I use only one
 RDD (table). the issue arises when we join two or more RDDs. According to
 this https://datastax-oss.atlassian.net/browse/SPARKC-23, its is a bug
 when we use custom ROW and use JOIN. But the solution proposed isn't
 working in my case.

 Any clue ?


 On Tue, Mar 17, 2015 at 10:19 PM, Michael Armbrust mich...@databricks.com
  wrote:

 I'll caution you that this is not a stable public API.

 That said, it seems that the issue is that you have not copied the jar
 file containing your class to all of the executors.  You should not need to
 do any special configuration of serialization (you can't for SQL, as we
 hard code it for performance, since we generally know all the types that
 are going to be shipped)

 On Tue, Mar 17, 2015 at 5:17 AM, zia_kayani zia.kay...@platalytics.com
 wrote:

 Hi,
 I want to introduce custom type for SchemaRDD, I'm following  this
 
 https://github.com/apache/spark/blob/branch-1.2/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
 
 example. But I'm having Kryo Serialization issues, here is stack trace:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in
 stage 6.0 failed 1 times, most recent failure:
 Lost task 0.0 in stage 6.0 (TID 22, localhost):
 *com.esotericsoftware.kryo.KryoException: Unable to find class:
 com.gis.io.GeometryWritable*
 Serialization trace:
 value (org.apache.spark.sql.catalyst.expressions.MutableAny)
 values (org.apache.spark.sql.catalyst.expressions.SpecificMutableRow)
at

 com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at

 com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
at

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
at

 com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
at

 com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at

 com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at
 com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at
 com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
at

 org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at

 org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at
 org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at

 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at

 org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at

 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at

 org.apache.spark.sql.execution.joins.HashedRelation$.apply(HashedRelation.scala:80)
at

 org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:46)
at

 org.apache.spark.sql.execution.joins.ShuffledHashJoin$$anonfun$execute$1.apply(ShuffledHashJoin.scala:45)
at

 org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
at
 

Spark per app logging

2015-03-20 Thread Udit Mehta
Hi,

We have spark setup such that there are various users running multiple jobs
at the same time. Currently all the logs go to 1 file specified in the
log4j.properties.
Is it possible to configure log4j in spark for per app/user logging instead
of sending all logs to 1 file mentioned in the log4j.properties?

Thanks
Udit


Matching Spark application metrics data to App Id

2015-03-20 Thread Judy Nash
Hi,

I want to get telemetry metrics on spark apps activities, such as run time and 
jvm activities.

Using Spark Metrics I am able to get the following sample data point on the an 
app:
type=GAUGE, name=application.SparkSQL::headnode0.1426626495312.runtime_ms, 
value=414873

How can I match this datapoint to the AppId? (i.e. app-20150317210815-0001)
Spark App name is not an unique identifier.
1426626495312 appear to be unique, but I am unable to see how this is related 
to the AppId.

Thanks,
Judy


Re: saveAsTable broken in v1.3 DataFrames?

2015-03-20 Thread Christian Perez
Any other users interested in a feature
DataFrame.saveAsExternalTable() for making _useful_ external tables in
Hive, or am I the only one? Bueller? If I start a PR for this, will it
be taken seriously?

On Thu, Mar 19, 2015 at 9:34 AM, Christian Perez christ...@svds.com wrote:
 Hi Yin,

 Thanks for the clarification. My first reaction is that if this is the
 intended behavior, it is a wasted opportunity. Why create a managed
 table in Hive that cannot be read from inside Hive? I think I
 understand now that you are essentially piggybacking on Hive's
 metastore to persist table info between/across sessions, but I imagine
 others might expect more (as I have.)

 We find ourselves wanting to do work in Spark and persist the results
 where other users (e.g. analysts using Tableau connected to
 Hive/Impala) can explore it. I imagine this is very common. I can, of
 course, save it as parquet and create an external table in hive (which
 I will do now), but saveAsTable seems much less useful to me now.

 Any other opinions?

 Cheers,

 C

 On Thu, Mar 19, 2015 at 9:18 AM, Yin Huai yh...@databricks.com wrote:
 I meant table properties and serde properties are used to store metadata of
 a Spark SQL data source table. We do not set other fields like SerDe lib.
 For a user, the output of DESCRIBE EXTENDED/FORMATTED on a data source table
 should not show unrelated stuff like Serde lib and InputFormat. I have
 created https://issues.apache.org/jira/browse/SPARK-6413 to track the
 improvement on the output of DESCRIBE statement.

 On Thu, Mar 19, 2015 at 12:11 PM, Yin Huai yh...@databricks.com wrote:

 Hi Christian,

 Your table is stored correctly in Parquet format.

 For saveAsTable, the table created is not a Hive table, but a Spark SQL
 data source table
 (http://spark.apache.org/docs/1.3.0/sql-programming-guide.html#data-sources).
 We are only using Hive's metastore to store the metadata (to be specific,
 only table properties and serde properties). When you look at table
 property, there will be a field called spark.sql.sources.provider and the
 value will be org.apache.spark.sql.parquet.DefaultSource. You can also
 look at your files in the file system. They are stored by Parquet.

 Thanks,

 Yin

 On Thu, Mar 19, 2015 at 12:00 PM, Christian Perez christ...@svds.com
 wrote:

 Hi all,

 DataFrame.saveAsTable creates a managed table in Hive (v0.13 on
 CDH5.3.2) in both spark-shell and pyspark, but creates the *wrong*
 schema _and_ storage format in the Hive metastore, so that the table
 cannot be read from inside Hive. Spark itself can read the table, but
 Hive throws a Serialization error because it doesn't know it is
 Parquet.

 val df = sc.parallelize( Array((1,2), (3,4)) ).toDF(education,
 income)
 df.saveAsTable(spark_test_foo)

 Expected:

 COLUMNS(
   education BIGINT,
   income BIGINT
 )

 SerDe Library:
 org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
 InputFormat:
 org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

 Actual:

 COLUMNS(
   col arraystring COMMENT from deserializer
 )

 SerDe Library: org.apache.hadoop.hive.serd2.MetadataTypedColumnsetSerDe
 InputFormat: org.apache.hadoop.mapred.SequenceFileInputFormat

 ---

 Manually changing schema and storage restores access in Hive and
 doesn't affect Spark. Note also that Hive's table property
 spark.sql.sources.schema is correct. At first glance, it looks like
 the schema data is serialized when sent to Hive but not deserialized
 properly on receive.

 I'm tracing execution through source code... but before I get any
 deeper, can anyone reproduce this behavior?

 Cheers,

 Christian

 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






 --
 Christian Perez
 Silicon Valley Data Science
 Data Analyst
 christ...@svds.com
 @cp_phd



-- 
Christian Perez
Silicon Valley Data Science
Data Analyst
christ...@svds.com
@cp_phd

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread java8964
Do you think the ulimit for the user running Spark on your nodes?
Can you run ulimit -a under the user who is running spark on the executor 
node? Does the result make sense for the data you are trying to process?
Yong
From: szheng.c...@gmail.com
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too 
large vs FileNotFoundException (Too many open files) on spark 1.2.1
Date: Fri, 20 Mar 2015 15:28:26 -0400

Hi All, I try to run a simple sort by on 1.2.1. And it always give me below two 
errors: 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 
(TID 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException: 
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
 (Too many open files) And then I switch 
to:conf.set(spark.shuffle.consolidateFiles, 
true).set(spark.shuffle.manager, SORT) Then I get the error: Exception in 
thread main org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 5 in stage 1.0 failed 4 times, most recent failure: Lost task 5.3 
in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal): 
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large
at com.esotericsoftware.kryo.io.Output.flush(Output.java:157) I roughly 
know the first issue is because Spark shuffle creates too many local temp files 
(and I don’t know the solution, because looks like my solution also cause other 
issues), but I am not sure what means is the second error.  Anyone knows the 
solution for both cases? Regards, Shuai 
  

EC2 cluster created by spark using old HDFS 1.0

2015-03-20 Thread morfious902002
Hi,
I created a cluster using spark-ec2 script. But it installs HDFS version
1.0. I would like to use this cluster to connect to HIVE installed on a
cloudera CDH 5.3 cluster. But I am getting the following error:- 

org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot
communicate with client vers
 
ion 4
at org.apache.hadoop.ipc.Client.call(Client.java:1070)
at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225)
at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396)
at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379)
at
org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:8

 
9)
at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1404)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:254)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:176)
at
org.apache.hadoop.mapred.SequenceFileInputFormat.listStatus(SequenceFileInputFormat.

 
java:40)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1511)
at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:83)
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:815)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32)
at $iwC$$iwC$$iwC$$iwC.init(console:34)
at $iwC$$iwC$$iwC.init(console:36)
at $iwC$$iwC.init(console:38)
at $iwC.init(console:40)
at init(console:42)
at 

com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Shuai Zheng
Hi All,

 

I try to run a simple sort by on 1.2.1. And it always give me below two
errors:

 

1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
/tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6
b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19d
d-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b5326782
6 (Too many open files)

 

And then I switch to:

conf.set(spark.shuffle.consolidateFiles, true)

.set(spark.shuffle.manager, SORT)

 

Then I get the error:

 

Exception in thread main org.apache.spark.SparkException: Job aborted due
to stage failure: Task 5 in stage 1.0 failed 4 times, most recent failure:
Lost task 5.3 in stage 1.0 (TID 36, ip-10-169-217-47.ec2.internal):
com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large

at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)

 

I roughly know the first issue is because Spark shuffle creates too many
local temp files (and I don't know the solution, because looks like my
solution also cause other issues), but I am not sure what means is the
second error. 

 

Anyone knows the solution for both cases?

 

Regards,

 

Shuai



Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
I notice that some people send messages directly to user@spark.apache.org
and some via nabble, either using email or the web client.

There are two index sites, one directly at apache.org and one at nabble.
But messages sent directly to user@spark.apache.org only show up in the
apache list.  Further, it appears that you can subscribe either directly to
user@spark.apache.org, in which you see all emails, or via nabble and you
see a subset.

Is this correct and is it intentional?

Apache site:
  http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser

Nabble site:
  http://apache-spark-user-list.1001560.n3.nabble.com/

An example of a message that only shows up in Apache:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E


This message was sent both to Nabble and user@spark.apache.org to see how
that behaves.

Jim


Re: Mailing list schizophrenia?

2015-03-20 Thread Jim Kleckner
Yes, it did get delivered to the apache list shown here:

http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E

But the web site for spark community directs people to nabble for viewing
messages and it doesn't show up there.

Community page: http://spark.apache.org/community.html

Link in that page to the archive:
http://apache-spark-user-list.1001560.n3.nabble.com/

The reliable archive:
http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser



On Fri, Mar 20, 2015 at 12:34 PM, Ted Yu yuzhih...@gmail.com wrote:

 Jim:
 I can find the example message here:
 http://search-hadoop.com/m/JW1q5zP54J1

 On Fri, Mar 20, 2015 at 12:29 PM, Jim Kleckner j...@cloudphysics.com
 wrote:

 I notice that some people send messages directly to user@spark.apache.org
 and some via nabble, either using email or the web client.

 There are two index sites, one directly at apache.org and one at
 nabble.  But messages sent directly to user@spark.apache.org only show
 up in the apache list.  Further, it appears that you can subscribe either
 directly to user@spark.apache.org, in which you see all emails, or via
 nabble and you see a subset.

 Is this correct and is it intentional?

 Apache site:
   http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/browser

 Nabble site:
   http://apache-spark-user-list.1001560.n3.nabble.com/

 An example of a message that only shows up in Apache:

 http://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3CCAGK53LnsD59wwQrP3-9yHc38C4eevAfMbV2so%2B_wi8k0%2Btq5HQ%40mail.gmail.com%3E


 This message was sent both to Nabble and user@spark.apache.org to see
 how that behaves.

 Jim





Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: File too large vs FileNotFoundException (Too many open files) on spark 1.2.1

2015-03-20 Thread Charles Feduke
Assuming you are on Linux, what is your /etc/security/limits.conf set for
nofile/soft (number of open file handles)?

On Fri, Mar 20, 2015 at 3:29 PM Shuai Zheng szheng.c...@gmail.com wrote:

 Hi All,



 I try to run a simple sort by on 1.2.1. And it always give me below two
 errors:



 1, 15/03/20 17:48:29 WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID
 35, ip-10-169-217-47.ec2.internal): java.io.FileNotFoundException:
 /tmp/spark-e40bb112-3a08-4f62-9eaa-cd094fcfa624/spark-58f72d53-8afc-41c2-ad6b-e96b479b51f5/spark-fde6da79-0b51-4087-8234-2c07ac6d7586/spark-dd7d6682-19dd-4c66-8aa5-d8a4abe88ca2/16/temp_shuffle_756b59df-ef3a-4680-b3ac-437b53267826
 (Too many open files)



 And then I switch to:

 conf.set(spark.shuffle.consolidateFiles, true)

 .set(spark.shuffle.manager, SORT)



 Then I get the error:



 Exception in thread main org.apache.spark.SparkException: Job aborted
 due to stage failure: Task 5 in stage 1.0 failed 4 times, most recent
 failure: Lost task 5.3 in stage 1.0 (TID 36,
 ip-10-169-217-47.ec2.internal): com.esotericsoftware.kryo.KryoException:
 java.io.IOException: File too large

 at com.esotericsoftware.kryo.io.Output.flush(Output.java:157)



 I roughly know the first issue is because Spark shuffle creates too many
 local temp files (and I don’t know the solution, because looks like my
 solution also cause other issues), but I am not sure what means is the
 second error.



 Anyone knows the solution for both cases?



 Regards,



 Shuai



Create a Spark cluster with cloudera CDH 5.2 support

2015-03-20 Thread morfious902002
Hi,
I am trying to create a Spark cluster using the spark-ec2 script which will
support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by
adding --hadoop-major-version=2.5.0 which solved some of the errors I was
getting. But now when I run select query on hive I get the following error:-

Caused by: com.google.protobuf.InvalidProtocolBufferException: Message
missing required fields: callId, status

Has anybody tried doing this? Is there a solution?
I used this command to create my cluster:-
./spark-ec2 --key-pair=awskey --identity-file=awskey.pem
--instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2
--zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2
--spark-version=1.3.0 --slaves=1 launch spark-cluster

Thank You for the help.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib Spam example gets stuck in Stage X

2015-03-20 Thread Xiangrui Meng
Su, which Spark version did you use? -Xiangrui

On Thu, Mar 19, 2015 at 3:49 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 To get these metrics out, you need to open the driver ui running on port
 4040. And in there you will see Stages information and for each stage you
 can see how much time it is spending on GC etc. In your case, the
 parallelism seems 4, the more # of parallelism the more # of tasks you will
 see.

 Thanks
 Best Regards

 On Thu, Mar 19, 2015 at 1:15 PM, Su She suhsheka...@gmail.com wrote:

 Hi Akhil,

 1) How could I see how much time it is spending on stage 1? Or what if,
 like above, it doesn't get past stage 1?

 2) How could I check if its a GC time? and where would I increase the
 parallelism for the model? I have a Spark Master and 2 Workers running on
 CDH 5.3...what would the default spark-shell level of parallelism be...I
 thought it would be 3?

 Thank you for the help!

 -Su


 On Thu, Mar 19, 2015 at 12:32 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Can you see where exactly it is spending time? Like you said it goes to
 Stage 2, then you will be able to see how much time it spend on Stage 1. See
 if its a GC time, then try increasing the level of parallelism or
 repartition it like sc.getDefaultParallelism*3.

 Thanks
 Best Regards

 On Thu, Mar 19, 2015 at 12:15 PM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 I am trying to run this MLlib example from Learning Spark:

 https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/MLlib.scala#L48

 Things I'm doing differently:

 1) Using spark shell instead of an application

 2) instead of their spam.txt and normal.txt I have text files with 3700
 and 2700 words...nothing huge at all and just plain text

 3) I've used numFeatures = 100, 1000 and 10,000

 Error: I keep getting stuck when I try to run the model:

 val model = new LogisticRegressionWithSGD().run(trainingData)

 It will freeze on something like this:

 [Stage 1:==(1 +
 0) / 4]

 Sometimes its Stage 1, 2 or 3.

 I am not sure what I am doing wrong...any help is much appreciated,
 thank you!

 -Su






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Create a Spark cluster with cloudera CDH 5.2 support

2015-03-20 Thread Sean Owen
I think you missed -Phadoop-2.4

On Fri, Mar 20, 2015 at 5:27 PM, morfious902002 anubha...@gmail.com wrote:
 Hi,
 I am trying to create a Spark cluster using the spark-ec2 script which will
 support 2.5.0-cdh5.3.2 for HDFS as well as Hive. I created a cluster by
 adding --hadoop-major-version=2.5.0 which solved some of the errors I was
 getting. But now when I run select query on hive I get the following error:-

 Caused by: com.google.protobuf.InvalidProtocolBufferException: Message
 missing required fields: callId, status

 Has anybody tried doing this? Is there a solution?
 I used this command to create my cluster:-
 ./spark-ec2 --key-pair=awskey --identity-file=awskey.pem
 --instance-type=m3.xlarge --spot-price=0.08 --region=us-west-2
 --zone=us-west-2c --hadoop-major-version=2.5.0-cdh5.3.2
 --spark-version=1.3.0 --slaves=1 launch spark-cluster

 Thank You for the help.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Create-a-Spark-cluster-with-cloudera-CDH-5-2-support-tp22168.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark per app logging

2015-03-20 Thread Ted Yu
Are these jobs the same jobs, just run by different users or, different
jobs ?
If the latter, can each application use its own log4j.properties ?

Cheers

On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta ume...@groupon.com wrote:

 Hi,

 We have spark setup such that there are various users running multiple
 jobs at the same time. Currently all the logs go to 1 file specified in the
 log4j.properties.
 Is it possible to configure log4j in spark for per app/user logging
 instead of sending all logs to 1 file mentioned in the log4j.properties?

 Thanks
 Udit



Re: Error when using multiple python files spark-submit

2015-03-20 Thread Guillaume Charhon
I see. I will try the other way around.

On Thu, Mar 19, 2015 at 8:06 PM, Davies Liu dav...@databricks.com wrote:

 the options of spark-submit should come before main.py, or they will
 become the options of main.py, so it should be:

  ../hadoop/spark-install/bin/spark-submit --py-files

  
 /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py
  --master spark://spark-m:7077 main.py

 On Mon, Mar 16, 2015 at 4:11 AM, poiuytrez guilla...@databerries.com
 wrote:
  I have a spark app which is composed of multiple files.
 
  When I launch Spark using:
 
  ../hadoop/spark-install/bin/spark-submit main.py --py-files
 
 /home/poiuytrez/naive.py,/home/poiuytrez/processing.py,/home/poiuytrez/settings.py
  --master spark://spark-m:7077
 
  I am getting an error:
 
  15/03/13 15:54:24 INFO TaskSetManager: Lost task 6.3 in stage 413.0
 (TID
  5817) on executor spark-w-3.c.databerries.internal:
  org.apache.spark.api.python.PythonException (Traceback (most recent call
  last):   File /home/hadoop/spark-install/python/pyspark/worker.py, line
  90, in main
  command = pickleSer._read_with_length(infile)   File
  /home/hadoop/spark-install/python/pyspark/serializers.py, line 151, in
  _read_with_length
  return self.loads(obj)   File
  /home/hadoop/spark-install/python/pyspark/serializers.py, line 396, in
  loads
  return cPickle.loads(obj) ImportError: No module named naive
 
  It is weird because I do not serialize anything. naive.py is also
 available
  on every machine at the same path.
 
  Any insight on what could be going on? The issue does not happen on my
  laptop.
 
  PS : I am using Spark 1.2.0.
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-using-multiple-python-files-spark-submit-tp22080.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Spark-submit and multiple files

2015-03-20 Thread Petar Zecevic


I tried your program in yarn-client mode and it worked with no 
exception. This is the command I used:


spark-submit --master yarn-client --py-files work.py main.py

(Spark 1.2.1)

On 20.3.2015. 9:47, Guillaume Charhon wrote:

Hi Davies,

I am already using --py-files. The system does use the other file. The 
error I am getting is not trivial. Please check the error log.




On Thu, Mar 19, 2015 at 8:03 PM, Davies Liu dav...@databricks.com 
mailto:dav...@databricks.com wrote:


You could submit additional Python source via --py-files , for
example:

$ bin/spark-submit --py-files work.py main.py

On Tue, Mar 17, 2015 at 3:29 AM, poiuytrez
guilla...@databerries.com mailto:guilla...@databerries.com wrote:
 Hello guys,

 I am having a hard time to understand how spark-submit behave
with multiple
 files. I have created two code snippets. Each code snippet is
composed of a
 main.py and work.py. The code works if I paste work.py then
main.py in a
 pyspark shell. However both snippets do not work when using
spark submit and
 generate different errors.

 Function add_1 definition outside
 http://www.codeshare.io/4ao8B
 https://justpaste.it/jzvj

 Embedded add_1 function definition
 http://www.codeshare.io/OQJxq
 https://justpaste.it/jzvn

 I am trying a way to make it work.

 Thank you for your support.



 --
 View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submit-and-multiple-files-tp22097.html
 Sent from the Apache Spark User List mailing list archive at
Nabble.com.


-
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
mailto:user-h...@spark.apache.org







How to check that a dataset is sorted after it has been written out?

2015-03-20 Thread Michael Albert
Greetings!
I sorted a dataset in Spark and then wrote it out in avro/parquet.
Then I wanted to check that it was sorted.
It looks like each partition has been sorted, but when reading in, the first 
partition (i.e., as seen in the partition index of mapPartitionsWithIndex) is 
not the same  as implied by the names of the parquet files (even when the 
number of partitions is the same in therdd which was read as on disk).
If I take() a few hundred values, they are sorted, but they are *not* the 
same as if I explicitly open part-r-0.parquet and take values from that.
It seems that when opening the rdd, the partitions of the rdd are not in the 
sameorder as implied by the data on disk (i.e., part-r-0.parquet, 
part-r-1.parquet, etc).
So, how might one read the data so that one maintains the sort order?
And while on the subject, after the terasort, how did they check that the 
data was actually sorted correctly? (or did they :-) ? ).
Is there any way to read the data back in so as to preserve the sort, or do I 
need to zipWithIndex before writing it out, and write the index at that time? 
(I haven't tried the latter yet).
Thanks!-Mike


IPyhon notebook command for spark need to be updated?

2015-03-20 Thread cong yue
Hello :

I tried ipython notebook with the following command in my enviroment.

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
--pylab inline ./bin/pyspark

But it shows  --pylab inline support is removed from ipython newest version.
the log is as :
---
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
--pylab inline ./bin/pyspark
[E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
command line has been removed.
[E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
`%matplotlib inline` in the notebook itself.
--
I am using IPython 3.0.0. and only IPython works in my enviroment.
--
$ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
--pylab inline ./bin/pyspark
--

Does somebody have the same issue as mine? How do you solve it?

Thanks,
Cong

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: IPyhon notebook command for spark need to be updated?

2015-03-20 Thread Krishna Sankar
Yep the command-option is gone. No big deal, just add the '%pylab
inline' command
as part of your notebook.
Cheers
k/

On Fri, Mar 20, 2015 at 3:45 PM, cong yue yuecong1...@gmail.com wrote:

 Hello :

 I tried ipython notebook with the following command in my enviroment.

 PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark

 But it shows  --pylab inline support is removed from ipython newest
 version.
 the log is as :
 ---
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 [E 15:29:43.076 NotebookApp] Support for specifying --pylab on the
 command line has been removed.
 [E 15:29:43.077 NotebookApp] Please use `%pylab inline` or
 `%matplotlib inline` in the notebook itself.
 --
 I am using IPython 3.0.0. and only IPython works in my enviroment.
 --
 $ PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS=notebook
 --pylab inline ./bin/pyspark
 --

 Does somebody have the same issue as mine? How do you solve it?

 Thanks,
 Cong

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org