Re: sbt run with spark.ContextCleaner ERROR

2014-05-06 Thread wxhsdp
Hi, TD

i tried on v1.0.0-rc3 and still got the error



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304p5421.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Better option to use Querying in Spark

2014-05-06 Thread prabeesh k
Thank you for your prompt reply.

Regards,
prabeesh


On Tue, May 6, 2014 at 11:44 AM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 All three have different usecases. If you are looking for more of a
 warehouse you are better off with Shark.
 SparkSQL is a way to query regular data in sql like syntax leveraging
 columnar store.

 BlinkDB is a experiment, meant to integrate with Shark in the long term.
 Not meant for production usecase directly.


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Tue, May 6, 2014 at 11:22 AM, prabeesh k prabsma...@gmail.com wrote:

  Hi,

 I have seen three different ways to query data from Spark

1. Default SQL support(

 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/sql/examples/HiveFromSpark.scala
)
2. Shark
3. Blink DB


 I would like know which one is more efficient

 Regards.
 prabeesh





Re: Increase Stack Size Workers

2014-05-06 Thread Matei Zaharia
Add export SPARK_JAVA_OPTS=“-Xss16m” to conf/spark-env.sh. Then it should apply 
to the executor.

Matei


On May 5, 2014, at 2:20 PM, Andrea Esposito and1...@gmail.com wrote:

 Hi there,
 
 i'm doing an iterative algorithm and sometimes i ended up with 
 StackOverflowError, doesn't matter if i do checkpoints or not.
 
 Remaining i don't understand why this is happening, i figure out that 
 increasing the stack size is a workaround.
 
 Developing using local[n] so the local mode i can set the stack size 
 through the -Xss parameter. How can i do the same for the standalone mode for 
 each worker? Setting it as java -Xss16m Worker seems useless because the 
 actual computation are done on CoarseGrainExecutor..
 
 Best,
 EA



run spark0.9.1 on yarn with hadoop CDH4

2014-05-06 Thread Sophia
Hi all,
 I have make HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which
contains the (client side) configuration files for the hadoop cluster. 
The command to launch the YARN Client which I run is like this:

#
SPARK_JAR=./~/spark-0.9.1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar
examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class
org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3
\--master-memory 2g \--worker-memory 2g \--worker-cores 1
./bin/spark-class: line 152: /usr/lib/jvm/java-7-sun/bin/java: No such file
or directory
./bin/spark-class: line 152: exec: /usr/lib/jvm/java-7-sun/bin/java: cannot
execute: No such file or directory
How to make it runs well?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/run-spark0-9-1-on-yarn-with-hadoop-CDH4-tp5426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: spark-shell driver interacting with Workers in YARN mode - firewall blocking communication

2014-05-06 Thread Andrew Lee
Hi Jacob,
I agree, we need to address both driver and workers bidirectionally.
If the subnet is isolated and self-contained, only limited ports are configured 
to access the driver via a dedicated gateway for the user, could you explain 
your concern? or what doesn't satisfy the security criteria?
Are you referring to any security certificate or regulation requirement that 
separate subnet with a configurable policy couldn't satisfy?
What I meant a subnet basically includes both driver and Workers running in 
this subnet. See following example setup.
e.g. (254 max nodes for example)Hadoop / HDFS = 10.5.5.0/24 (GW 10.5.5.1) 
eth0Spark Driver and Worker bind to = 10.10.10.0/24 eth1 with routing to 
10.5.5.0/24 on specific ports for NameNode and DataNode.So basically driver and 
Worker are bound to the same subnet that is separated from others.iptables for 
10.10.10.0/24 can allow SSH 22 login (or port forwarding) onto the Spark Driver 
machine to launch shell or submit spark jobs.


Subject: RE: spark-shell driver interacting with Workers in YARN mode - 
firewall blocking communication
To: user@spark.apache.org
From: jeis...@us.ibm.com
Date: Mon, 5 May 2014 12:40:53 -0500


Howdy Andrew,



I agree; the subnet idea is a good one...  unfortunately, it doesn't really 
help to secure the network.



You mentioned that the drivers need to talk to the workers.  I think it is 
slightly broader - all of the workers and the driver/shell need to be 
addressable from/to each other on any dynamic port.



I would check out setting the environment variable SPARK_LOCAL_IP [1].  This 
seems to enable Spark to bind correctly to a private subnet.



Jacob



[1]  http://spark.apache.org/docs/latest/configuration.html 



Jacob D. Eisinger

IBM Emerging Technologies

jeis...@us.ibm.com - (512) 286-6075



Andrew Lee ---05/04/2014 09:57:08 PM---Hi Jacob, Taking both concerns into 
account, I'm actually thinking about using a separate subnet to



From:   Andrew Lee alee...@hotmail.com

To: user@spark.apache.org user@spark.apache.org

Date:   05/04/2014 09:57 PM

Subject:RE: spark-shell driver interacting with Workers in YARN mode - 
firewall blocking communication








Hi Jacob,



Taking both concerns into account, I'm actually thinking about using a separate 
subnet to isolate the Spark Workers, but need to look into how to bind the 
process onto the correct interface first. This may require some code change.

Separate subnet doesn't limit itself with port range so port exhaustion should 
rarely happen, and won't impact performance.



By opening up all port between 32768-61000 is actually the same as no firewall, 
this expose some security concerns, but need more information whether that is 
critical or not.



The bottom line is the driver needs to talk to the Workers. The way how user 
access the Driver should be easier to solve such as launching Spark (shell) 
driver on a specific interface.



Likewise, if you found out any interesting solutions, please let me know. I'll 
share the solution once I have something up and running. Currently, it is 
running ok with iptables off, but still need to figure out how to 
product-ionize the security part.



Subject: RE: spark-shell driver interacting with Workers in YARN mode - 
firewall blocking communication

To: user@spark.apache.org

From: jeis...@us.ibm.com

Date: Fri, 2 May 2014 16:07:50 -0500



Howdy Andrew,



I think I am running into the same issue [1] as you.  It appears that Spark 
opens up dynamic / ephemera [2] ports for each job on the shell and the 
workers.  As you are finding out, this makes securing and managing the network 
for Spark very difficult.



 Any idea how to restrict the 'Workers' port range?

The port range can be found by running: 
$ sysctl net.ipv4.ip_local_port_range

net.ipv4.ip_local_port_range = 32768 61000


With that being said, a couple avenues you may try: 

Limit the dynamic ports [3] to a more reasonable number and open all of these 
ports on your firewall; obviously, this might have unintended consequences like 
port exhaustion. 
Secure the network another way like through a private VPN; this may reduce 
Spark's performance.


If you have other workarounds, I am all ears --- please let me know!

Jacob



[1] 
http://apache-spark-user-list.1001560.n3.nabble.com/Securing-Spark-s-Network-tp4832p4984.html

[2] http://en.wikipedia.org/wiki/Ephemeral_port

[3] 
http://www.cyberciti.biz/tips/linux-increase-outgoing-network-sockets-range.html



Jacob D. Eisinger

IBM Emerging Technologies

jeis...@us.ibm.com - (512) 286-6075



Andrew Lee ---05/02/2014 03:15:42 PM---Hi Yana,  I did. I configured the the 
port in spark-env.sh, the problem is not the driver port which



From: Andrew Lee alee...@hotmail.com

To: user@spark.apache.org user@spark.apache.org

Date: 05/02/2014 03:15 PM

Subject: RE: spark-shell driver interacting with Workers in YARN mode - 
firewall blocking communication







Hi Yana, 



I did. I configured 

How can I run sbt?

2014-05-06 Thread Sophia
Hi all,
#./sbt/sbt assembly
Launching sbt from sbt/sbt-launch-0.12.4.jar
Invalid or corrupt jarfile sbt/sbt-launch-0.12.4.jar
Why cannot I run sbt well?
Best regards,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-can-I-run-sbt-tp5429.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark's behavior

2014-05-06 Thread Eduardo Costa Alfaia
Ok Andrew,
Thanks

I sent informations of test with 8 worker and the gap is grown up.

 
On May 4, 2014, at 2:31, Andrew Ash and...@andrewash.com wrote:

 From the logs, I see that the print() starts printing stuff 10 seconds 
 after the context is started. And that 10 seconds is taken by the initial 
 empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure 
 all the executors have started. Somehow the first empty task takes 7-8 
 seconds to complete. See if this can be reproduced by running a simple, 
 empty job in spark shell (in the same cluster) and see if the first task 
 takes 7-8 seconds. 
 
 Either way, I didnt see the 30 second gap, but a 10 second gap. And that 
 does not seem to be a persistent problem as after that 10 seconds, the data 
 is being received and processed.
 
 TD


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Storage information about an RDD from the API

2014-05-06 Thread Andras Nemeth
Thanks Koert, very useful!


On Tue, Apr 29, 2014 at 6:41 PM, Koert Kuipers ko...@tresata.com wrote:

 SparkContext.getRDDStorageInfo


 On Tue, Apr 29, 2014 at 12:34 PM, Andras Nemeth 
 andras.nem...@lynxanalytics.com wrote:

 Hi,

 Is it possible to know from code about an RDD if it is cached, and more
 precisely, how many of its partitions are cached in memory and how many are
 cached on disk? I know I can get the storage level, but I also want to know
 the current actual caching status. Knowing memory consumption would also be
 awesome. :)

 Basically what I'm looking for is the information on the storage tab of
 the UI, but accessible from the API.

 Thanks,
 Andras





Re: Incredible slow iterative computation

2014-05-06 Thread Andrea Esposito
Thanks all for helping.
Following the Earthson's tip i resolved. I have to report that if you
materialized the RDD and after you try to checkpoint it the operation
doesn't perform.

newRdd = oldRdd.map(myFun).persist(myStorageLevel)
newRdd.foreach(x = myFunLogic(x)) // Here materialized for other reasons
...
if(condition){ // after i would checkpoint
newRdd.checkpoint
newRdd.isCheckpointed // false here
newRdd.foreach(x = {}) // Force evaluation
newRdd.isCheckpointed // still false here
}
oldRdd.unpersist(true)


2014-05-06 3:35 GMT+02:00 Earthson earthson...@gmail.com:

 checkpoint seems to be just add a CheckPoint mark? You need an action after
 marked it. I have tried it with success:)

 newRdd = oldRdd.map(myFun).persist(myStorageLevel)
 newRdd.checkpoint // checkpoint here
 newRdd.isCheckpointed // false here
 newRdd.foreach(x = {}) // Force evaluation
 newRdd.isCheckpointed // true here
 oldRdd.unpersist(true)


 

 If you have new broadcast object for each step of iteration, broadcast will
 eat up all of the memory. You may need to set spark.cleaner.ttl to a
 small
 enough value.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Incredible-slow-iterative-computation-tp4204p5407.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



KryoSerializer Exception

2014-05-06 Thread Andrea Esposito
Hi there,

sorry if i'm posting a lot lately.

i'm trying to add the KryoSerializer but i receive this exception:
2014 - 05 - 06 11: 45: 23 WARN TaskSetManager: 62 - Loss was due to
java.io.EOFException
java.io.EOFException
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:
105)
at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala: 165)
at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:
56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:
57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:
43)
at java.lang.reflect.Method.invoke(Method.java: 606)

I set the serializer as:
System.setProperty(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
System.setProperty(spark.kryo.registrator, test.TestKryoRegistrator)

With or without register my custom registrator it throws the exception.

Seems something related to broadcast.. but isn't Kryo already ok out of the
box just setting it as default serializer?


If it due to my file has been breakdown?

2014-05-06 Thread Sophia
Hi all,
[root@sophia spark-0.9.1]#
SPARK_JAR=.assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar
examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class
org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3
\--master-memory 2g \--worker-memory 2g \--worker-cores 1
./bin/spark-class: line 152: /usr/java/jdk1.7.0_25/bin/java: No such file or
directory
./bin/spark-class: line 152: exec: /usr/java/jdk1.7.0_25/bin/java: cannot
execute: No such file or directory
If it due to my file has been breakdown?
How can I do with it?
Best regards,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-it-due-to-my-file-has-been-breakdown-tp5438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about broadcast

2014-05-06 Thread randylu
i found that the small broadcast variable always took about 10s, not 5s or
else.
If there is some property/conf(which is default 10) that control the
timeout?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-broadcast-tp5416p5439.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: If it due to my file has been breakdown?

2014-05-06 Thread Mayur Rustagi
Mostly your JAVA_HOME variable is wrong. Can you configure that in sparkenv
file.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Tue, May 6, 2014 at 5:53 PM, Sophia sln-1...@163.com wrote:

 Hi all,
 [root@sophia spark-0.9.1]#

 SPARK_JAR=.assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 ./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar
 examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class
 org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3
 \--master-memory 2g \--worker-memory 2g \--worker-cores 1
 ./bin/spark-class: line 152: /usr/java/jdk1.7.0_25/bin/java: No such file
 or
 directory
 ./bin/spark-class: line 152: exec: /usr/java/jdk1.7.0_25/bin/java: cannot
 execute: No such file or directory
 If it due to my file has been breakdown?
 How can I do with it?
 Best regards,





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/If-it-due-to-my-file-has-been-breakdown-tp5438.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: If it due to my file has been breakdown?

2014-05-06 Thread Sophia
I have modified it in spark-env.sh,but it turns out that it does not work.So
coufused.
Best Regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/If-it-due-to-my-file-has-been-breakdown-tp5438p5442.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark and Java 8

2014-05-06 Thread Kristoffer Sjögren
Hi

I just read an article [1] about Spark, CDH5 and Java 8 but did not get
exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark
using a separate JVM that run on data nodes or is it reusing the YARN JVM
runtime somehow, like hadoop1?

CDH5 only supports Java 7 [2] as far as I know?

Cheers,
-Kristoffer


[1]
http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/
[2]
http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html


Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't submit jobs.

2014-05-06 Thread Jacob Eisinger

Howdy,

You might find the discussion Andrew and I have been having about Docker
and network security [1] applicable.

Also, I posted an answer [2] to your stackoverflow question.

[1]
http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-driver-interacting-with-Workers-in-YARN-mode-firewall-blocking-communication-tp5237p5441.html
[2]
http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns/23495100#23495100

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Gerard Maas gerard.m...@gmail.com
To: user@spark.apache.org
Date:   05/05/2014 04:18 PM
Subject:Re: Local Dev Env with Mesos + Spark Streaming on Docker: Can't
submit jobs.



Hi Benjamin,

Yes, we initially used a modified version of the AmpLabs docker scripts
[1]. The amplab docker images are a good starting point.
One of the biggest hurdles has been HDFS, which requires reverse-DNS and I
didn't want to go the dnsmasq route to keep the containers relatively
simple to use without the need of external scripts. Ended up running a
1-node setup nnode+dnode. I'm still looking for a better solution for HDFS
[2]

Our usecase using docker is to easily create local dev environments both
for development and for automated functional testing (using cucumber). My
aim is to strongly reduce the time of the develop-deploy-test cycle.
That  also means that we run the minimum number of instances required to
have a functionally working setup. E.g. 1 Zookeeper, 1 Kafka broker, ...

For the actual cluster deployment we have Chef-based devops toolchain that
put things in place on public cloud providers.
Personally, I think Docker rocks and would like to replace those complex
cookbooks with Dockerfiles once the technology is mature enough.

-greetz, Gerard.

[1] https://github.com/amplab/docker-scripts
[2]
http://stackoverflow.com/questions/23410505/how-to-run-hdfs-cluster-without-dns


On Mon, May 5, 2014 at 11:00 PM, Benjamin bboui...@gmail.com wrote:
  Hi,

  Before considering running on Mesos, did you try to submit the
  application on Spark deployed without Mesos on Docker containers ?

  Currently investigating this idea to deploy quickly a complete set of
  clusters with Docker, I'm interested by your findings on sharing the
  settings of Kafka and Zookeeper across nodes. How many broker and
  zookeeper do you use ?

  Regards,



  On Mon, May 5, 2014 at 10:11 PM, Gerard Maas gerard.m...@gmail.com
  wrote:
   Hi all,

   I'm currently working on creating a set of docker images to facilitate
   local development with Spark/streaming on Mesos (+zk, hdfs, kafka)

   After solving the initial hurdles to get things working together in
   docker containers, now everything seems to start-up correctly and the
   mesos UI shows slaves as they are started.

   I'm trying to submit a job from IntelliJ and the jobs submissions seem
   to get lost in Mesos translation. The logs are not helping me to figure
   out what's wrong, so I'm posting them here in the hope that they can
   ring a bell and somebdoy could provide me a hint on what's wrong/missing
   with my setup.


    DRIVER (IntelliJ running a Job.scala main) 
   14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
   SHUFFLE_BLOCK_MANAGER
   14/05/05 21:52:31 INFO BlockManager: Dropping broadcast blocks older
   than 1399319251962
   14/05/05 21:52:31 INFO BlockManager: Dropping non broadcast blocks older
   than 1399319251962
   14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
   BROADCAST_VARS
   14/05/05 21:52:31 INFO MetadataCleaner: Ran metadata cleaner for
   BLOCK_MANAGER
   14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
   HTTP_BROADCAST
   14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
   MAP_OUTPUT_TRACKER
   14/05/05 21:52:32 INFO MetadataCleaner: Ran metadata cleaner for
   SPARK_CONTEXT


    MESOS MASTER 
   I0505 19:52:39.718080   388 master.cpp:690] Registering framework
   201405051517-67113388-5050-383-6995 at scheduler(1)@127.0.1.1:58115
   I0505 19:52:39.718261   388 master.cpp:493] Framework
   201405051517-67113388-5050-383-6995 disconnected
   I0505 19:52:39.718277   389 hierarchical_allocator_process.hpp:332]
   Added framework 201405051517-67113388-5050-383-6995
   I0505 19:52:39.718312   388 master.cpp:520] Giving framework
   201405051517-67113388-5050-383-6995 0ns to failover
   I0505 19:52:39.718431   389 hierarchical_allocator_process.hpp:408]
   Deactivated framework 201405051517-67113388-5050-383-6995
   W0505 19:52:39.718459   388 master.cpp:1388] Master returning resources
   offered to framework 201405051517-67113388-5050-383-6995 because the
   framework has terminated or is inactive
   I0505 19:52:39.718567   388 master.cpp:1376] Framework failover timeout,
   removing framework 201405051517-67113388-5050-383-6995



    MESOS SLAVE 
   I0505 19:49:27.662019    20 slave.cpp:1191] Asked to shut down 

Re: Comprehensive Port Configuration reference?

2014-05-06 Thread Jacob Eisinger

Howdy Scott,

Please see the discussions about securing the Spark network [1] [2].

In a nut shell, Spark opens up a couple of well known ports.  And,then the
workers and the shell open up dynamic ports for each job.  These dynamic
ports make securing the Spark network difficult.

Jacob

[1]
http://apache-spark-user-list.1001560.n3.nabble.com/Securing-Spark-s-Network-td4832.html
[2]
http://apache-spark-user-list.1001560.n3.nabble.com/spark-shell-driver-interacting-with-Workers-in-YARN-mode-firewall-blocking-communication-td5237.html

Jacob D. Eisinger
IBM Emerging Technologies
jeis...@us.ibm.com - (512) 286-6075



From:   Scott Clasen scott.cla...@gmail.com
To: u...@spark.incubator.apache.org
Date:   05/05/2014 11:39 AM
Subject:Comprehensive Port Configuration reference?



Is there somewhere documented how one would go about configuring every open
port a spark application needs?

This seems like one of the main things that make running spark hard in
places like EC2 where you arent using the canned spark scripts.

Starting an app looks like you'll see ports open for

BlockManager
OutoutTracker
FileServer
WebUI
Local port to get callbacks from mesos master..

What else?

How do I configure all of these?



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Comprehensive-Port-Configuration-reference-tp5384.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark and Java 8

2014-05-06 Thread Marcelo Vanzin
Hi Kristoffer,

You're correct that CDH5 only supports up to Java 7 at the moment. But
Yarn apps do not run in the same JVM as Yarn itself (and I believe MR1
doesn't either), so it might be possible to pass arguments in a way
that tells Yarn to launch the application master / executors with the
Java 8 runtime. I have never tried this, so I don't know if it's
really possible, and it's obviously not supported (also because Java 8
support is part of Spark 1.0 which hasn't been released yet).

You're welcome to try it out, and if you get it to work in some
manner, it would be great to hear back.


On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.com wrote:
 Hi

 I just read an article [1] about Spark, CDH5 and Java 8 but did not get
 exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark
 using a separate JVM that run on data nodes or is it reusing the YARN JVM
 runtime somehow, like hadoop1?

 CDH5 only supports Java 7 [2] as far as I know?

 Cheers,
 -Kristoffer


 [1]
 http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/
 [2]
 http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html







-- 
Marcelo


Re: Spark and Java 8

2014-05-06 Thread Ian O'Connell
I think the distinction there might be they never said they ran that code
under CDH5, just that spark supports it and spark runs under CDH5. Not that
you can use these features while running under CDH5.

They could use mesos or the standalone scheduler to run them


On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.com wrote:

 Hi

 I just read an article [1] about Spark, CDH5 and Java 8 but did not get
 exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark
 using a separate JVM that run on data nodes or is it reusing the YARN JVM
 runtime somehow, like hadoop1?

 CDH5 only supports Java 7 [2] as far as I know?

 Cheers,
 -Kristoffer


 [1]
 http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/
 [2]
 http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html







No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
Hi,

I've a `no space left on device` exception when pulling some 22GB data from
s3 block storage to the ephemeral HDFS. The cluster is on EC2 using
spark-ec2 script with 4 m1.large.

The code is basically:
  val in = sc.textFile(s3://...)
  in.saveAsTextFile(hdfs://...)

Spark creates 750 input partitions based on the input splits, when it
begins throwing this exception, there's no space left on the root file
system on some worker machine:

Filesystem   1K-blocks  Used Available Use% Mounted on
/dev/xvda1 8256952   8256952 0 100% /
tmpfs  3816808 0   3816808   0% /dev/shm
/dev/xvdb433455904  29840684 381596916   8% /mnt
/dev/xvdf433455904  29437000 382000600   8% /mnt2

Before the job begins, only 35% is used.

Filesystem   1K-blocks  Used Available Use% Mounted on
/dev/xvda1 8256952   2832256   5340840  35% /
tmpfs  3816808 0   3816808   0% /dev/shm
/dev/xvdb433455904  29857768 381579832   8% /mnt
/dev/xvdf433455904  29470104 381967496   8% /mnt2


Some suggestions on this problem? Does Spark caches/stores some data before
writing to HDFS?


Full stacktrace:
-
java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:345)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveBlock(Jets3tFileSystemStore.java:210)
at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
at com.sun.proxy.$Proxy8.retrieveBlock(Unknown Source)
at org.apache.hadoop.fs.s3.S3InputStream.blockSeekTo(S3InputStream.java:160)
at org.apache.hadoop.fs.s3.S3InputStream.read(S3InputStream.java:119)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
at
org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:92)
at
org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:51)
at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:156)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


Re: is Mesos falling out of favor?

2014-05-06 Thread deric
I guess it's due to missing documentation and quite complicated setup.
Continuous integration would be nice! 

Btw. is it possible to use spark as a shared library and not to fetch spark
tarball for each task?

Do you point SPARK_EXECUTOR_URI to HDFS url?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5448.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: run spark0.9.1 on yarn with hadoop CDH4

2014-05-06 Thread Andrew Lee
Please check JAVA_HOME. Usually it should point to /usr/java/default on 
CentOS/Linux.
or FYI: http://stackoverflow.com/questions/1117398/java-home-directory


 Date: Tue, 6 May 2014 00:23:02 -0700
 From: sln-1...@163.com
 To: u...@spark.incubator.apache.org
 Subject: run spark0.9.1 on yarn with hadoop CDH4
 
 Hi all,
  I have make HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which
 contains the (client side) configuration files for the hadoop cluster. 
 The command to launch the YARN Client which I run is like this:
 
 #
 SPARK_JAR=./~/spark-0.9.1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 ./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar
 examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class
 org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3
 \--master-memory 2g \--worker-memory 2g \--worker-cores 1
 ./bin/spark-class: line 152: /usr/lib/jvm/java-7-sun/bin/java: No such file
 or directory
 ./bin/spark-class: line 152: exec: /usr/lib/jvm/java-7-sun/bin/java: cannot
 execute: No such file or directory
 How to make it runs well?
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/run-spark0-9-1-on-yarn-with-hadoop-CDH4-tp5426.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
  

Re: No space left on device error when pulling data from s3

2014-05-06 Thread Akhil Das
I wonder why is your / is full. Try clearing out /tmp and also make sure in
the spark-env.sh you have put SPARK_JAVA_OPTS+=
-Dspark.local.dir=/mnt/spark

Thanks
Best Regards


On Tue, May 6, 2014 at 9:35 PM, Han JU ju.han.fe...@gmail.com wrote:

 Hi,

 I've a `no space left on device` exception when pulling some 22GB data
 from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using
 spark-ec2 script with 4 m1.large.

 The code is basically:
   val in = sc.textFile(s3://...)
   in.saveAsTextFile(hdfs://...)

 Spark creates 750 input partitions based on the input splits, when it
 begins throwing this exception, there's no space left on the root file
 system on some worker machine:

 Filesystem   1K-blocks  Used Available Use% Mounted on
 /dev/xvda1 8256952   8256952 0 100% /
 tmpfs  3816808 0   3816808   0% /dev/shm
 /dev/xvdb433455904  29840684 381596916   8% /mnt
 /dev/xvdf433455904  29437000 382000600   8% /mnt2

 Before the job begins, only 35% is used.

 Filesystem   1K-blocks  Used Available Use% Mounted on
 /dev/xvda1 8256952   2832256   5340840  35% /
 tmpfs  3816808 0   3816808   0% /dev/shm
 /dev/xvdb433455904  29857768 381579832   8% /mnt
 /dev/xvdf433455904  29470104 381967496   8% /mnt2


 Some suggestions on this problem? Does Spark caches/stores some data
 before writing to HDFS?


 Full stacktrace:
 -
 java.io.IOException: No space left on device
 at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveBlock(Jets3tFileSystemStore.java:210)
 at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
  at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
  at com.sun.proxy.$Proxy8.retrieveBlock(Unknown Source)
 at
 org.apache.hadoop.fs.s3.S3InputStream.blockSeekTo(S3InputStream.java:160)
  at org.apache.hadoop.fs.s3.S3InputStream.read(S3InputStream.java:119)
 at java.io.DataInputStream.read(DataInputStream.java:100)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
 at
 org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:92)
  at
 org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:51)
 at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:156)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)


 --
 *JU Han*

 Data Engineer @ Botify.com

 +33 061960



Re: No space left on device error when pulling data from s3

2014-05-06 Thread Han JU
After some investigation, I found out that there's lots of temp files under

/tmp/hadoop-root/s3/

But this is strange since in both conf files,
~/ephemeral-hdfs/conf/core-site.xml and ~/spark/conf/core-site.xml, the
setting `hadoop.tmp.dir` is set to `/mnt/ephemeral-hdfs/`. Why spark jobs
still write temp files to /tmp/hadoop-root ?


2014-05-06 18:05 GMT+02:00 Han JU ju.han.fe...@gmail.com:

 Hi,

 I've a `no space left on device` exception when pulling some 22GB data
 from s3 block storage to the ephemeral HDFS. The cluster is on EC2 using
 spark-ec2 script with 4 m1.large.

 The code is basically:
   val in = sc.textFile(s3://...)
   in.saveAsTextFile(hdfs://...)

 Spark creates 750 input partitions based on the input splits, when it
 begins throwing this exception, there's no space left on the root file
 system on some worker machine:

 Filesystem   1K-blocks  Used Available Use% Mounted on
 /dev/xvda1 8256952   8256952 0 100% /
 tmpfs  3816808 0   3816808   0% /dev/shm
 /dev/xvdb433455904  29840684 381596916   8% /mnt
 /dev/xvdf433455904  29437000 382000600   8% /mnt2

 Before the job begins, only 35% is used.

 Filesystem   1K-blocks  Used Available Use% Mounted on
 /dev/xvda1 8256952   2832256   5340840  35% /
 tmpfs  3816808 0   3816808   0% /dev/shm
 /dev/xvdb433455904  29857768 381579832   8% /mnt
 /dev/xvdf433455904  29470104 381967496   8% /mnt2


 Some suggestions on this problem? Does Spark caches/stores some data
 before writing to HDFS?


 Full stacktrace:
 -
 java.io.IOException: No space left on device
 at java.io.FileOutputStream.writeBytes(Native Method)
  at java.io.FileOutputStream.write(FileOutputStream.java:345)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
  at
 org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveBlock(Jets3tFileSystemStore.java:210)
 at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
  at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
  at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82)
 at
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59)
  at com.sun.proxy.$Proxy8.retrieveBlock(Unknown Source)
 at
 org.apache.hadoop.fs.s3.S3InputStream.blockSeekTo(S3InputStream.java:160)
  at org.apache.hadoop.fs.s3.S3InputStream.read(S3InputStream.java:119)
 at java.io.DataInputStream.read(DataInputStream.java:100)
  at org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)
 at
 org.apache.hadoop.mapred.LineRecordReader.init(LineRecordReader.java:92)
  at
 org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:51)
 at org.apache.spark.rdd.HadoopRDD$$anon$1.init(HadoopRDD.scala:156)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
 at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
 at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
 at org.apache.spark.scheduler.Task.run(Task.scala:53)
  at
 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)


 --
 *JU Han*

 Data Engineer @ Botify.com

 +33 061960




-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


Re: Spark and Java 8

2014-05-06 Thread Matei Zaharia
Java 8 support is a feature in Spark, but vendors need to decide for themselves 
when they’d like support Java 8 commercially. You can still run Spark on Java 7 
or 6 without taking advantage of the new features (indeed our builds are always 
against Java 6).

Matei

On May 6, 2014, at 8:59 AM, Ian O'Connell i...@ianoconnell.com wrote:

 I think the distinction there might be they never said they ran that code 
 under CDH5, just that spark supports it and spark runs under CDH5. Not that 
 you can use these features while running under CDH5.
 
 They could use mesos or the standalone scheduler to run them
 
 
 On Tue, May 6, 2014 at 6:16 AM, Kristoffer Sjögren sto...@gmail.com wrote:
 Hi
 
 I just read an article [1] about Spark, CDH5 and Java 8 but did not get 
 exactly how Spark can run Java 8 on a YARN cluster at runtime. Is Spark using 
 a separate JVM that run on data nodes or is it reusing the YARN JVM runtime 
 somehow, like hadoop1?
 
 CDH5 only supports Java 7 [2] as far as I know?
 
 Cheers,
 -Kristoffer
 
 
 [1] 
 http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8/
 [2] 
 http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH5-Requirements-and-Supported-Versions/CDH5-Requirements-and-Supported-Versions.html
 
 
 
 
 



Spark Summit 2014 (Hotel suggestions)

2014-05-06 Thread Jerry Lam
Hi Spark users,

Do you guys plan to go the spark summit? Can you recommend any hotel near
the conference? I'm not familiar with the area.

Thanks!

Jerry


logging in pyspark

2014-05-06 Thread Diana Carroll
What should I do if I want to log something as part of a task?

This is what I tried.  To set up a logger, I followed the advice here:
http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off

logger = logging.getLogger(py4j)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

This works fine when I call it from my driver (ie pyspark):
logger.info(this works fine)

But I want to try logging within a distributed task so I did this:

def logTestMap(a):
logger.info(test)
return a

myrdd.map(logTestMap).count()

and got:
PicklingError: Can't pickle 'lock' object

So it's trying to serialize my function and can't because of a lock object
used in logger, presumably for thread-safeness.  But then...how would I do
it?  Or is this just a really bad idea?

Thanks
Diana


Re: logging in pyspark

2014-05-06 Thread Nicholas Chammas
I think you're looking for
RDD.foreach()http://spark.apache.org/docs/latest/api/pyspark/pyspark.rdd.RDD-class.html#foreach
.

According to the programming
guidehttp://spark.apache.org/docs/latest/scala-programming-guide.html
:

Run a function func on each element of the dataset. This is usually done
 for side effects such as updating an accumulator variable (see below) or
 interacting with external storage systems.


Do you really want to log something for each element of your RDD?

Nick


On Tue, May 6, 2014 at 3:31 PM, Diana Carroll dcarr...@cloudera.com wrote:

 What should I do if I want to log something as part of a task?

 This is what I tried.  To set up a logger, I followed the advice here:
 http://py4j.sourceforge.net/faq.html#how-to-turn-logging-on-off

 logger = logging.getLogger(py4j)
 logger.setLevel(logging.INFO)
 logger.addHandler(logging.StreamHandler())

 This works fine when I call it from my driver (ie pyspark):
 logger.info(this works fine)

 But I want to try logging within a distributed task so I did this:

 def logTestMap(a):
  logger.info(test)
 return a

 myrdd.map(logTestMap).count()

 and got:
 PicklingError: Can't pickle 'lock' object

 So it's trying to serialize my function and can't because of a lock object
 used in logger, presumably for thread-safeness.  But then...how would I do
 it?  Or is this just a really bad idea?

 Thanks
 Diana



Easy one

2014-05-06 Thread Ian Ferreira
Hi there,

Why can¹t I seem to kick the executor memory higher? See below from EC2
deployment using m1.large


And in the spark-env.sh
export SPARK_MEM=6154m


And in the spark context
sconf.setExecutorEnv(spark.executor.memory, 4g²)

Cheers
- Ian





Re: Easy one

2014-05-06 Thread Aaron Davidson
If you're using standalone mode, you need to make sure the Spark Workers
know about the extra memory. This can be configured in spark-env.sh on the
workers as

export SPARK_WORKER_MEMORY=4g


On Tue, May 6, 2014 at 5:29 PM, Ian Ferreira ianferre...@hotmail.comwrote:

 Hi there,

 Why can’t I seem to kick the executor memory higher? See below from EC2
 deployment using m1.large


 And in the spark-env.sh
 export SPARK_MEM=6154m


 And in the spark context
 sconf.setExecutorEnv(spark.executor.memory, 4g”)

 Cheers
 - Ian




Re: How to read a multipart s3 file?

2014-05-06 Thread Andre Kuhnen
Try using s3n instead of s3
Em 06/05/2014 21:19, kamatsuoka ken...@gmail.com escreveu:

 I have a Spark app that writes out a file, s3://mybucket/mydir/myfile.txt.

 Behind the scenes, the S3 driver creates a bunch of files like
 s3://mybucket//mydir/myfile.txt/part-, as well as the block files like
 s3://mybucket/block_3574186879395643429.

 How do I construct an url to use this file as input to another Spark app?
  I
 tried all the variations of s3://mybucket/mydir/myfile.txt, but none of
 them
 work.





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-a-multipart-s3-file-tp5463.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



customized comparator in groupByKey

2014-05-06 Thread Ameet Kini
I'd like to override the logic of comparing keys for equality in
groupByKey. Kinda like how combineByKey allows you to pass in the combining
logic for values, I'd like to do the same for keys.

My code looks like this:
val res = rdd.groupBy(myPartitioner)
Here, rdd is of type RDD[(MyKey, MyValue)], so res turns out to be of type
RDD[(MyKey, Seq[MyValue])]

MyKey is defined as case class MyKey(field1: Int, field2: Int)
and myPartitioner's getPartition(key: Any), here key is of type MyKey and
the partitioning logic is an expression on both field1 and field2.

I'm guessing the groupBy uses equals to compare like instances of MyKey.
Currently, the equals method of MyKey uses both field1 and field2, as
would be natural to its implementation. However, I'd like to have the
groupBy only use field1. Any pointers on how I can go about doing it?

One way is the following, but I'd like to avoid creating all those MyNewKey
objects:
val partitionedRdd = rdd.partitionBy(myPartitioner)
val mappedRdd = partitionedRdd.mapPartitions(partition =
partition.map(case (myKey, myValue) = (new MyNewKey(myKey.field1),
myValue)),
preservesPartitioning=true)
val groupedRdd = mappedRdd.groupByKey()


Thanks,
Ameet