Spark Streaming to capture packets from interface

2014-06-27 Thread swezzz
Hi.. I am new to Spark . Is it possible to capture live packets from a
network interface through spark streaming? Is there a library or any built
in classes to bind to the network interface directly?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-to-capture-packets-from-interface-tp8399.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Map with filter on JavaRdd

2014-06-27 Thread ajay garg
Hi All,
 Is it possible to map and filter a javardd in a single operation?

Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Map with filter on JavaRdd

2014-06-27 Thread Mayur Rustagi
It happens in a single operation itself. You may write it separately but
the stages are performed together if its possible. You will see only one
task in the output of your application.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Fri, Jun 27, 2014 at 12:12 PM, ajay garg ajay.g...@mobileum.com wrote:

 Hi All,
  Is it possible to map and filter a javardd in a single operation?

 Thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0

2014-06-27 Thread MEETHU MATHEW
Hi Akhil,

The IP is correct and is able to start the workers when we start it as a java 
command.Its becoming 192.168.125.174:0  when we call from the scripts.


 
Thanks  Regards, 
Meethu M


On Friday, 27 June 2014 1:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote:
 


why is it binding to port 0? 192.168.125.174:0 :/

Check the ip address of that master machine (ifconfig) looks like the ip 
address has been changed (hoping you are running this machines on a LAN)


Thanks
Best Regards


On Fri, Jun 27, 2014 at 12:00 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote:

Hi all,


My Spark(Standalone mode) was running fine till yesterday.But now I am getting 
 the following exeception when I am running start-slaves.sh or start-all.sh


slave3: failed to launch org.apache.spark.deploy.worker.Worker:
slave3:   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
slave3:   at java.lang.Thread.run(Thread.java:662)


The log files has the following lines.


14/06/27 11:06:30 INFO SecurityManager: Using Spark's default log4j profile: 
org/apache/spark/log4j-defaults.properties
14/06/27 11:06:30 INFO SecurityManager: Changing view acls to: hduser
14/06/27 11:06:30 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hduser)
14/06/27 11:06:30 INFO Slf4jLogger: Slf4jLogger started
14/06/27 11:06:30 INFO Remoting: Starting remoting
Exception in thread main org.jboss.netty.channel.ChannelException: Failed to 
bind to: master/192.168.125.174:0
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
...
Caused by: java.net.BindException: Cannot assign requested address
...
I saw the same error reported before and have tried the following solutions.


Set the variable SPARK_LOCAL_IP ,Changed the SPARK_MASTER_PORT to a different 
number..But nothing is working.


When I try to start the worker from the respective machines using the 
following java command,its running without any exception


java -cp 
::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar
 -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m 
org.apache.spark.deploy.worker.Worker spark://:master:7077



Somebody please give a solution
 
Thanks  Regards, 
Meethu M

Issue in using classes with constructor as vertex attribute in graphx

2014-06-27 Thread harsh2005_7
Hi,

I have a scenario where I am having a class X with constructor parameter as
(RDD,Double).When I am initializing the the class object with corresponding
RDD and double value (of name say x1) and putting it as a vertex attribute
in graph , I am losing my RDD value . The Double value remains intact . I
tried accessing simultaneously the RDD from instance variable (x1) and i see
it intact there but for some reason it's not available when i take graph
vertex attribute and access the RDD. Please help me to understand which
concept I am missing here ? And whats the correct way to do it.

Regards,
Harsh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-in-using-classes-with-constructor-as-vertex-attribute-in-graphx-tp8407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [GraphX] Cast error when comparing a vertex attribute after its type has changed

2014-06-27 Thread Pierre-Alexandre Fonta
Thanks for having corrected this bug!

The fix version is marked as 1.1.0 ( SPARK-1552
https://issues.apache.org/jira/browse/SPARK-1552  ). I have tested my code
snippet with Spark 1.0.0 (Scala 2.10.4) and it works. I don't know if it's
important to mention it.

Pierre-Alexandre



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Cast-error-when-comparing-a-vertex-attribute-after-its-type-has-changed-tp4119p8408.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Map with filter on JavaRdd

2014-06-27 Thread ajay garg
Thanks Mayur for clarification..



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401p8410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Improving Spark multithreaded performance?

2014-06-27 Thread anoldbrain
I have not used this, only watched a presentation of it in spark summit 2013.

https://github.com/radlab/sparrow
https://spark-summit.org/talk/ousterhout-next-generation-spark-scheduling-with-sparrow/

Pure conjecture from your high scheduling latency and the size of your
cluster, it seems one way to look at.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Improving-Spark-multithreaded-performance-tp8359p8411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Fine-grained mesos execution hangs on Debian 7.4

2014-06-27 Thread Fedechicco
Hello Sebastien,

it is not working with the 1.0 branch either.
I decided to compile spark from source precisely because of the
[SPARK-2204] fix, because before that I couldn't get fine-grained working
at all.
Now it works fine if the cluster is only composed of Ubuntu 14.04 nodes,
and when I introduce the Debian 7.4 nodes they hang like described.

I tested both the master (cloned with the [SPARK-2204] fix already inside)
and the 1.0 branch with that commit cherry-picked inside.
The behaviour is the same: whatever the reason is, it has not been
introduced after the 1.0 release.

Did anybody else test fine-grained with a Debian 7 or 7.4?




2014-06-26 19:23 GMT+00:00 Sébastien Rainville sebastienrainvi...@gmail.com
:

 Hello Federico,

 is it working with the 1.0 branch? In either branch, make sure that you
 have this commit:
 https://github.com/apache/spark/commit/1132e472eca1a00c2ce10d2f84e8f0e79a5193d3
 I never saw the behavior you are describing, but that commit is important
 if you are running in fine-grained mode, and it was merged only yesterday.

 - Sebastien



 On Thu, Jun 26, 2014 at 12:11 PM, Fedechicco fedechi...@gmail.com wrote:

 Hello,

 as from object, when I run scala spark-shell on our mesos (0.19) cluster
 some spark slaves just hang at the end of the staging phase for any given
 elaboration.

 The cluster has mixed OSes (Ubuntu 14.04 / Debian 7.4), but if I run the
 same shell and commands using coarse grained mode everything works just
 fine.

 I'm using a spark 1.1.0-SNAPSHOT built from sources (pulled today from
 git), on openjdk-7.

 Sadly I can't get any error message from the sandboxes for the hanging
 slaves, everything seems in order, just stuck.

 Any suggestion on how to debug this?

 thanks,
 Federico





Re: Spark vs Google cloud dataflow

2014-06-27 Thread Martin Goodson
My experience is that gaining 20 spot instances accounts for a tiny
fraction of the total time of provisioning a cluster with spark-ec2. This
is not (solely) an AWS issue.


-- 
Martin Goodson  |  VP Data Science
(0)20 3397 1240
[image: Inline image 1]


On Thu, Jun 26, 2014 at 10:14 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Hmm, I remember a discussion on here about how the way in which spark-ec2
 rsyncs stuff to the cluster for setup could be improved, and I’m assuming
 there are other such improvements to be made. Perhaps those improvements
 don’t matter much when compared to EC2 instance launch times, but I’m not
 sure.
 ​


 On Thu, Jun 26, 2014 at 4:48 PM, Aureliano Buendia buendia...@gmail.com
 wrote:




 On Thu, Jun 26, 2014 at 9:42 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:


 That’s technically true, but I’d be surprised if there wasn’t a lot of
 room for improvement in spark-ec2 regarding cluster launch+config
 times.

 Unfortunately, this is a spark support issue, but an AWS one. Starting a
 few months ago, Amazon AWS services have been having bigger and bigger
 lags. Indeed, the default timeout hard coded  in spark-ec2 is no longer
 able to launch the cluster successfully, and many people here reported that
 they had to increase it.


 ​






Re: Spark vs Google cloud dataflow

2014-06-27 Thread Sean Owen
On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote:
 Summingbird is for map/reduce. Dataflow is the third generation of google's
 map/reduce, and it generalizes map/reduce the way Spark does. See more about
 this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s

Yes, my point was that Summingbird is similar in that it is a
higher-level service for batch/streaming computation, not that it is
similar for being MapReduce-based.

 It seems Dataflow is based on this paper:
 http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf

FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is
more than that but yeah that seems to be some of the 'language'. It is
similar in that it is a distributed collection abstraction.


Re: Spark vs Google cloud dataflow

2014-06-27 Thread Dean Wampler
... and to be clear on the point, Summingbird is not limited to MapReduce.
It abstracts over Scalding (which abstracts over Cascading, which is being
moved from MR to Spark) and over Storm for event processing.


On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com
 wrote:
  Summingbird is for map/reduce. Dataflow is the third generation of
 google's
  map/reduce, and it generalizes map/reduce the way Spark does. See more
 about
  this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s

 Yes, my point was that Summingbird is similar in that it is a
 higher-level service for batch/streaming computation, not that it is
 similar for being MapReduce-based.

  It seems Dataflow is based on this paper:
  http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf

 FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is
 more than that but yeah that seems to be some of the 'language'. It is
 similar in that it is a distributed collection abstraction.




-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


How to use .newAPIHadoopRDD() from Java (w/ Cassandra)

2014-06-27 Thread Martin Gammelsæter
Hello!

I have just started trying out Spark to see if it fits my needs, but I
am running into some issues when trying to port the
CassandraCQLTest.scala example into Java. The specific errors etc.
that I encounter can be seen here:

http://stackoverflow.com/questions/24450540/how-to-use-sparks-newapihadooprdd-java-equivalent-of-scalas-classof

where I have also asked the same question. Any pointers on how to use
.newAPIHadoopRDD() and CqlPagingInputFormat from Java is greatly
appreciated! (Either here or on Stack Overflow)

-- 
Best regards,
Martin Gammelsæter


Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
I put the settings as you specified in spark-env.sh for the master. When 
I run start-all.sh, the web UI shows both the worker on the master 
(machine1) and the slave worker (machine2) as ALIVE and ready, with the 
master URL at spark://192.168.1.101. However, when I run spark-submit, 
it immediately crashes with


py4j.protocol.Py4JJavaError14/06/27 09:01:32 ERROR Remoting: Remoting 
error: [Startup failed]

akka.remote.RemoteTransportException: Startup failed
[...]
org.jboss.netty.channel.ChannelException: Failed to bind to 
/192.168.1.101:5060

[...]
java.net.BindException: Address already in use.
[...]

This seems entirely contrary to intuition; why would Spark be unable to 
bind to the exact IP:port set for the master?


On 6/27/14, 1:54 AM, Akhil Das wrote:

Hi Shannon,

How about a setting like the following? (just removed the quotes)

export SPARK_MASTER_IP=192.168.1.101
export SPARK_MASTER_PORT=5060
#export SPARK_LOCAL_IP=127.0.0.1

Not sure whats happening in your case, it could be that your system is 
not able to bind to 192.168.1.101 address. What is the spark:// master 
url that you are seeing there in the webUI? (It should be 
spark://192.168.1.101:7077 in your case).




Thanks
Best Regards


On Fri, Jun 27, 2014 at 5:47 AM, Shannon Quinn squ...@gatech.edu 
mailto:squ...@gatech.edu wrote:


In the interest of completeness, this is how I invoke spark:

[on master]

 sbin/start-all.sh
 spark-submit --py-files extra.py main.py

iPhone'd

On Jun 26, 2014, at 17:29, Shannon Quinn squ...@gatech.edu
mailto:squ...@gatech.edu wrote:


My *best guess* (please correct me if I'm wrong) is that the
master (machine1) is sending the command to the worker (machine2)
with the localhost argument as-is; that is, machine2 isn't doing
any weird address conversion on its end.

Consequently, I've been focusing on the settings of the
master/machine1. But I haven't found anything to indicate where
the localhost argument could be coming from. /etc/hosts lists
only 127.0.0.1 as localhost; spark-defaults.conf list
spark.master as the full IP address (not 127.0.0.1); spark-env.sh
on the master also lists the full IP under SPARK_MASTER_IP. The
*only* place on the master where it's associated with localhost
is SPARK_LOCAL_IP.

In looking at the logs of the worker spawned on master, it's also
receiving a spark://localhost:5060 argument, but since it
resides on the master that works fine. Is it possible that the
master is, for some reason, passing
spark://{SPARK_LOCAL_IP}:5060 to the workers?

That was my motivation behind commenting out SPARK_LOCAL_IP;
however, that's when the master crashes immediately due to the
address already being in use.

Any ideas? Thanks!

Shannon

On 6/26/14, 10:14 AM, Akhil Das wrote:

Can you paste your spark-env.sh file?

Thanks
Best Regards


On Thu, Jun 26, 2014 at 7:01 PM, Shannon Quinn
squ...@gatech.edu mailto:squ...@gatech.edu wrote:

Both /etc/hosts have each other's IP addresses in them.
Telneting from machine2 to machine1 on port 5060 works just
fine.

Here's the output of lsof:

user@machine1:~/spark/spark-1.0.0-bin-hadoop2$
mailto:user@machine1:%7E/spark/spark-1.0.0-bin-hadoop2$
lsof -i:5060
COMMAND   PID   USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java23985 user   30u  IPv6 11092354  0t0  TCP
machine1:sip (LISTEN)
java23985 user   40u  IPv6 11099560  0t0  TCP
machine1:sip-machine1:48315 (ESTABLISHED)
java23985 user   52u  IPv6 11100405  0t0  TCP
machine1:sip-machine2:54476 (ESTABLISHED)
java24157 user   40u  IPv6 11092413  0t0  TCP
machine1:48315-machine1:sip (ESTABLISHED)

Ubuntu seems to recognize 5060 as the standard port for
sip; it's not actually running anything there besides
Spark, it just does a s/5060/sip/g.

Is there something to the fact that every time I comment out
SPARK_LOCAL_IP in spark-env, it crashes immediately upon
spark-submit due to the address already being in use? Or
am I barking up the wrong tree on that one?

Thanks again for all your help; I hope we can knock this one
out.

Shannon


On 6/26/14, 9:13 AM, Akhil Das wrote:

Do you have ip machine1 in your workers
/etc/hosts also? If so try telneting from your machine2 to
machine1 on port 5060. Also make sure nothing else is
running on port 5060 other than Spark (*/lsof -i:5060/*)

Thanks
Best Regards


On Thu, Jun 26, 2014 at 6:35 PM, Shannon Quinn
squ...@gatech.edu mailto:squ...@gatech.edu wrote:

Still running into the same problem. /etc/hosts on the
master says

127.0.0.1localhost
ip machine1

ip is the 

Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
No joy, unfortunately. Same issue; see my previous email--still crashes 
with address already in use.


On 6/27/14, 1:54 AM, sujeetv wrote:

Try to explicitly set set the spark.driver.host property to the master's
IP.
Sujeet



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
Sorry, master spark URL in the web UI is *spark://192.168.1.101:5060*, 
exactly as configured.


On 6/27/14, 9:07 AM, Shannon Quinn wrote:
I put the settings as you specified in spark-env.sh for the master. 
When I run start-all.sh, the web UI shows both the worker on the 
master (machine1) and the slave worker (machine2) as ALIVE and ready, 
with the master URL at spark://192.168.1.101. However, when I run 
spark-submit, it immediately crashes with


py4j.protocol.Py4JJavaError14/06/27 09:01:32 ERROR Remoting: Remoting 
error: [Startup failed]

akka.remote.RemoteTransportException: Startup failed
[...]
org.jboss.netty.channel.ChannelException: Failed to bind to 
/192.168.1.101:5060

[...]
java.net.BindException: Address already in use.
[...]

This seems entirely contrary to intuition; why would Spark be unable 
to bind to the exact IP:port set for the master?


On 6/27/14, 1:54 AM, Akhil Das wrote:

Hi Shannon,

How about a setting like the following? (just removed the quotes)

export SPARK_MASTER_IP=192.168.1.101
export SPARK_MASTER_PORT=5060
#export SPARK_LOCAL_IP=127.0.0.1

Not sure whats happening in your case, it could be that your system 
is not able to bind to 192.168.1.101 address. What is the spark:// 
master url that you are seeing there in the webUI? (It should be 
spark://192.168.1.101:7077 in your case).




Thanks
Best Regards


On Fri, Jun 27, 2014 at 5:47 AM, Shannon Quinn squ...@gatech.edu 
mailto:squ...@gatech.edu wrote:


In the interest of completeness, this is how I invoke spark:

[on master]

 sbin/start-all.sh
 spark-submit --py-files extra.py main.py

iPhone'd

On Jun 26, 2014, at 17:29, Shannon Quinn squ...@gatech.edu
mailto:squ...@gatech.edu wrote:


My *best guess* (please correct me if I'm wrong) is that the
master (machine1) is sending the command to the worker
(machine2) with the localhost argument as-is; that is, machine2
isn't doing any weird address conversion on its end.

Consequently, I've been focusing on the settings of the
master/machine1. But I haven't found anything to indicate where
the localhost argument could be coming from. /etc/hosts lists
only 127.0.0.1 as localhost; spark-defaults.conf list
spark.master as the full IP address (not 127.0.0.1);
spark-env.sh on the master also lists the full IP under
SPARK_MASTER_IP. The *only* place on the master where it's
associated with localhost is SPARK_LOCAL_IP.

In looking at the logs of the worker spawned on master, it's
also receiving a spark://localhost:5060 argument, but since it
resides on the master that works fine. Is it possible that the
master is, for some reason, passing
spark://{SPARK_LOCAL_IP}:5060 to the workers?

That was my motivation behind commenting out SPARK_LOCAL_IP;
however, that's when the master crashes immediately due to the
address already being in use.

Any ideas? Thanks!

Shannon

On 6/26/14, 10:14 AM, Akhil Das wrote:

Can you paste your spark-env.sh file?

Thanks
Best Regards


On Thu, Jun 26, 2014 at 7:01 PM, Shannon Quinn
squ...@gatech.edu mailto:squ...@gatech.edu wrote:

Both /etc/hosts have each other's IP addresses in them.
Telneting from machine2 to machine1 on port 5060 works just
fine.

Here's the output of lsof:

user@machine1:~/spark/spark-1.0.0-bin-hadoop2$
mailto:user@machine1:%7E/spark/spark-1.0.0-bin-hadoop2$
lsof -i:5060
COMMAND   PID   USER   FD   TYPE   DEVICE SIZE/OFF NODE NAME
java23985 user   30u  IPv6 110923540t0  TCP
machine1:sip (LISTEN)
java23985 user   40u  IPv6 110995600t0  TCP
machine1:sip-machine1:48315 (ESTABLISHED)
java23985 user   52u  IPv6 111004050t0  TCP
machine1:sip-machine2:54476 (ESTABLISHED)
java24157 user   40u  IPv6 110924130t0  TCP
machine1:48315-machine1:sip (ESTABLISHED)

Ubuntu seems to recognize 5060 as the standard port for
sip; it's not actually running anything there besides
Spark, it just does a s/5060/sip/g.

Is there something to the fact that every time I comment
out SPARK_LOCAL_IP in spark-env, it crashes immediately
upon spark-submit due to the address already being in
use? Or am I barking up the wrong tree on that one?

Thanks again for all your help; I hope we can knock this
one out.

Shannon


On 6/26/14, 9:13 AM, Akhil Das wrote:

Do you have ip machine1 in your workers
/etc/hosts also? If so try telneting from your machine2 to
machine1 on port 5060. Also make sure nothing else is
running on port 5060 other than Spark (*/lsof -i:5060/*)

Thanks
Best Regards


On Thu, Jun 26, 2014 at 6:35 PM, Shannon Quinn
squ...@gatech.edu mailto:squ...@gatech.edu wrote:

Still running 

Spark RDD member of class loses it's value when the class being used as graph attribute

2014-06-27 Thread harsh2005_7
Hi,

I have a scenario where I am having a class X with constructor parameter as
(RDD,Double).When I am initializing the the class object with corresponding
RDD and double value (of name say x1) and *putting it as a vertex attribute
in graph* , I am losing my RDD value . The Double value remains intact . I
tried accessing simultaneously the RDD from instance variable (x1) and i see
it intact there but for some reason it's not available when i take graph
vertex attribute and access the RDD. Please help me to understand which
concept I am missing here ? And whats the correct way to do it. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


problem when start spark streaming in cluster mode

2014-06-27 Thread Siyuan he
Hi all,

I can start a spark streaming app in Client mode on a Pseudo-standalone
cluster on my local machine.

However when I tried to start it in Cluster mode. It always get the
following exception on the Driver.

Exception in thread main akka.ConfigurationException: Could not
start logger due to [akka.ConfigurationException: Logger specified in
config can't be loaded [akka.event.slf4j.Slf4jLogger] due to
[akka.event.Logging$LoggerInitializationException: Logger
log1-Slf4jLogger did not respond with LoggerInitialized, sent instead
[TIMEOUT]]]

Can someone help?

Thanks,
siyuan


Re: ElasticSearch enrich

2014-06-27 Thread boci
Another question. In the foreachRDD I will initialize the JobConf, but in
this place how can I get information from the items?
I have an identifier in the data which identify the required ES index (so
how can I set dynamic index in the foreachRDD) ?

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote:

 Just your luck I happened to be working on that very talk today :) Let me
 know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well, the
 specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I only
 have one problem. How can I use elasticsearch-hadoop with spark streaming?
 I mean DStream doesn't have saveAsHadoopFiles method, my second problem
 the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project
 to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode (just
 ./bin/elasticsearch) and use the default config (host = localhost, port =
 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create
 es connection, but in prodution I want to use ElasticClient.remote, to 
 this
 I want to pass ElasticClient to mapPartitions, or what is the best
 practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type 
 of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node elastic search cluster.


 Thanks guys

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca
  wrote:

 So I'm giving a talk at the Spark summit on using Spark 
 ElasticSearch, but for now if you want to see a simple demo which uses
 elasticsearch for geo input you can take a look at my quick  dirty
 implementation with TopTweetsInALocation (
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of
 having to manually create ElasticSearch clients.

 This approach might not work for your data, e.g. if you need to
 create a query for 

Re: numpy + pyspark

2014-06-27 Thread Avishek Saha
I too felt the same Nick but I don't have root privileges on the cluster,
unfortunately. Are there any alternatives?


On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com wrote:

 I've not tried this - but numpy is a tricky and complex package with many
 dependencies on Fortran/C libraries etc. I'd say by the time you figure out
 correctly deploying numpy in this manner, you may as well have just built
 it into your cluster bootstrap process, or PSSH install it on each node...


 On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha avishek.s...@gmail.com
 wrote:

 To clarify I tried it and it almost worked -- but I am getting some
 problems from the Random module in numpy. If anyone has successfully passed
 a numpy module (via the --py-files option) to spark-submit then please let
 me know.

 Thanks !!
 Avishek


 On 26 June 2014 17:45, Avishek Saha avishek.s...@gmail.com wrote:

 Hi all,

 Instead of installing numpy in each worker node, is it possible to
 ship numpy (via --py-files option maybe) while invoking the
 spark-submit?

 Thanks,
 Avishek






Re: numpy + pyspark

2014-06-27 Thread Shannon Quinn
Would deploying virtualenv on each directory on the cluster be viable? 
The dependencies would get tricky but I think this is the sort of 
situation it's built for.


On 6/27/14, 11:06 AM, Avishek Saha wrote:
I too felt the same Nick but I don't have root privileges on the 
cluster, unfortunately. Are there any alternatives?



On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com 
mailto:nick.pentre...@gmail.com wrote:


I've not tried this - but numpy is a tricky and complex package
with many dependencies on Fortran/C libraries etc. I'd say by the
time you figure out correctly deploying numpy in this manner, you
may as well have just built it into your cluster bootstrap
process, or PSSH install it on each node...


On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha
avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote:

To clarify I tried it and it almost worked -- but I am getting
some problems from the Random module in numpy. If anyone has
successfully passed a numpy module (via the --py-files option)
to spark-submit then please let me know.

Thanks !!
Avishek


On 26 June 2014 17:45, Avishek Saha avishek.s...@gmail.com
mailto:avishek.s...@gmail.com wrote:

Hi all,

Instead of installing numpy in each worker node, is it
possible to
ship numpy (via --py-files option maybe) while invoking the
spark-submit?

Thanks,
Avishek








Re: numpy + pyspark

2014-06-27 Thread Shannon Quinn
I suppose along those lines, there's also Anaconda: 
https://store.continuum.io/cshop/anaconda/


On 6/27/14, 11:13 AM, Nick Pentreath wrote:
Hadoopy uses http://www.pyinstaller.org/ to package things up into an 
executable that should be runnable without root privileges. It says it 
support numpy



On Fri, Jun 27, 2014 at 5:08 PM, Shannon Quinn squ...@gatech.edu 
mailto:squ...@gatech.edu wrote:


Would deploying virtualenv on each directory on the cluster be
viable? The dependencies would get tricky but I think this is the
sort of situation it's built for.


On 6/27/14, 11:06 AM, Avishek Saha wrote:

I too felt the same Nick but I don't have root privileges on the
cluster, unfortunately. Are there any alternatives?


On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com
mailto:nick.pentre...@gmail.com wrote:

I've not tried this - but numpy is a tricky and complex
package with many dependencies on Fortran/C libraries etc.
I'd say by the time you figure out correctly deploying numpy
in this manner, you may as well have just built it into your
cluster bootstrap process, or PSSH install it on each node...


On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha
avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote:

To clarify I tried it and it almost worked -- but I am
getting some problems from the Random module in numpy. If
anyone has successfully passed a numpy module (via the
--py-files option) to spark-submit then please let me know.

Thanks !!
Avishek


On 26 June 2014 17:45, Avishek Saha
avishek.s...@gmail.com mailto:avishek.s...@gmail.com
wrote:

Hi all,

Instead of installing numpy in each worker node, is
it possible to
ship numpy (via --py-files option maybe) while
invoking the
spark-submit?

Thanks,
Avishek











Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-06-27 Thread Gerard Maas
I got an answer on SO on this question, basically confirming that the
CQLSSTableWrite cannot be used in Spark (at least  in the form shown in the
code snippet).  DataStax filed a bug on that and might get solved on a
future version.

As you have observed, a single writer can only be used in serial
(ConcurrentModificationExceptions will happen if you do not), and creating
multiple writers in the JVM fails due to static schema construction within
the Cassandra code that the SSTableWriter uses.

I'm not aware of any workaround other than to spawn multiple JVMs, each
writing to a separate directory.

We have filed a Cassandra JIRA ticket to address this issue.

https://issues.apache.org/jira/browse/CASSANDRA-7463;  - Tupshin Harper
http://stackoverflow.com/users/881195/tupshin-harper

S.O. question:
http://stackoverflow.com/questions/24396902/using-cqlsstablewriter-concurrently/24455785#24455785




On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote:

 Hi Gerard,

 What is the version of Spark, Hadoop, Cassandra and Calliope are you
 using. We never built Calliope to Hadoop2 as we/or our clients don't use
 Hadoop in their deployments or use it only as the Infra component for Spark
 in which case H1/H2 doesn't make a difference for them.

 I know atleast of one case where the user had built Calliope against 2.0
 and was using it happily. If you need assistance with it we are here to
 help. Feel free to reach out to me directly and we can work out a solution
 for you.

 Regards,
 Rohit


 *Founder  CEO, **Tuplejump, Inc.*
 
 www.tuplejump.com
 *The Data Engineering Platform*


 On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Thanks Nick.

 We used the CassandraOutputFormat through Calliope. The Calliope API
 makes the CassandraOutputFormat quite accessible  and is cool to work with.
  It worked fine at prototype level, but we had Hadoop version conflicts
 when we put it in our Spark environment (Using our Spark assembly compiled
 with CDH4.4). The conflict seems to be at the Cassandra-all lib level,
 which is compiled against a different hadoop version  (v1).

 We could not get round that issue. (Any pointers in that direction?)

 That's why I'm trying the direct CQLSSTableWriter way but it looks
 blocked as well.

  -kr, Gerard.




 On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com
  wrote:

 can you not use a Cassandra OutputFormat? Seems they have
 BulkOutputFormat. An example of using it with Hadoop is here:
 http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

 Using it with Spark will be similar to the examples:
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
 and
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


 On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 (My excuses for the cross-post from SO)

 I'm trying to create Cassandra SSTables from the results of a batch
 computation in Spark. Ideally, each partition should create the SSTable for
 the data it holds in order to parallelize the process as much as possible
 (and probably even stream it to the Cassandra ring as well)

 After the initial hurdles with the CQLSSTableWriter (like requiring
 the yaml file), I'm confronted now with this issue:




 java.lang.RuntimeException: Attempting to load already loaded column 
 family customer.rawts
 at org.apache.cassandra.config.Schema.load(Schema.java:347)
 at org.apache.cassandra.config.Schema.load(Schema.java:112)
 at 
 org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

 I'm creating a writer on each parallel partition like this:




 def store(rdd:RDD[Message]) = {
 rdd.foreachPartition( msgIterator = {
   val writer = CQLSSTableWriter.builder()
 .inDirectory(/tmp/cass)
 .forTable(schema)
 .using(insertSttmt).build()
   msgIterator.foreach(msg = {...})
 })}

 And if I'm reading the exception correctly, I can only create one
 writer per table in one JVM. Digging a bit further in the code, it looks
 like the Schema.load(...) singleton enforces that limitation.

 I guess writings to the writer will not be thread-safe and even if
 they were the contention that multiple threads will create by having all
 parallel tasks trying to dump few GB of data to disk at the same time will
 defeat the purpose of using the SSTables for bulk upload anyway.

 So, are there ways to use the CQLSSTableWriter concurrently?

 If not, what is the next best option to load batch data at high
 throughput in Cassandra?

 Will the upcoming Spark-Cassandra integration help with this? (ie.
 should I just sit back, relax and the problem will solve itself?)

 Thanks,

 Gerard.







Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.

2014-06-27 Thread Gerard Maas
Hi Rohit,

Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6
and Calliope GA  (Would love to try the pre-release version if you want
beta testers :-)   Our hadoop version is CDH4.4 and of course our spark
assembly is compiled against it.

We have got really interesting performance results from using Calliope and
will probably try to compile it against Hadoop 2. Compared to the DataStax
Java driver, out of the box, the Calliope lib gives us ~4.5x insert
performance with a higher network and cpu usage (which is what we want in
batch insert mode = fast)

With additional code optimizations using the DataStax driver, we were able
to reduce that gap to 2x but still Calliope was easier and faster to use.

Will you be attending the Spark Summit? I'll be around.

We'll be in touch in any case :-)

-kr, Gerard.



On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote:

 Hi Gerard,

 What is the version of Spark, Hadoop, Cassandra and Calliope are you
 using. We never built Calliope to Hadoop2 as we/or our clients don't use
 Hadoop in their deployments or use it only as the Infra component for Spark
 in which case H1/H2 doesn't make a difference for them.

 I know atleast of one case where the user had built Calliope against 2.0
 and was using it happily. If you need assistance with it we are here to
 help. Feel free to reach out to me directly and we can work out a solution
 for you.

 Regards,
 Rohit


 *Founder  CEO, **Tuplejump, Inc.*
 
 www.tuplejump.com
 *The Data Engineering Platform*


 On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Thanks Nick.

 We used the CassandraOutputFormat through Calliope. The Calliope API
 makes the CassandraOutputFormat quite accessible  and is cool to work with.
  It worked fine at prototype level, but we had Hadoop version conflicts
 when we put it in our Spark environment (Using our Spark assembly compiled
 with CDH4.4). The conflict seems to be at the Cassandra-all lib level,
 which is compiled against a different hadoop version  (v1).

 We could not get round that issue. (Any pointers in that direction?)

 That's why I'm trying the direct CQLSSTableWriter way but it looks
 blocked as well.

  -kr, Gerard.




 On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com
  wrote:

 can you not use a Cassandra OutputFormat? Seems they have
 BulkOutputFormat. An example of using it with Hadoop is here:
 http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html

 Using it with Spark will be similar to the examples:
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala
 and
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala


 On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi,

 (My excuses for the cross-post from SO)

 I'm trying to create Cassandra SSTables from the results of a batch
 computation in Spark. Ideally, each partition should create the SSTable for
 the data it holds in order to parallelize the process as much as possible
 (and probably even stream it to the Cassandra ring as well)

 After the initial hurdles with the CQLSSTableWriter (like requiring
 the yaml file), I'm confronted now with this issue:




 java.lang.RuntimeException: Attempting to load already loaded column 
 family customer.rawts
 at org.apache.cassandra.config.Schema.load(Schema.java:347)
 at org.apache.cassandra.config.Schema.load(Schema.java:112)
 at 
 org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336)

 I'm creating a writer on each parallel partition like this:




 def store(rdd:RDD[Message]) = {
 rdd.foreachPartition( msgIterator = {
   val writer = CQLSSTableWriter.builder()
 .inDirectory(/tmp/cass)
 .forTable(schema)
 .using(insertSttmt).build()
   msgIterator.foreach(msg = {...})
 })}

 And if I'm reading the exception correctly, I can only create one
 writer per table in one JVM. Digging a bit further in the code, it looks
 like the Schema.load(...) singleton enforces that limitation.

 I guess writings to the writer will not be thread-safe and even if
 they were the contention that multiple threads will create by having all
 parallel tasks trying to dump few GB of data to disk at the same time will
 defeat the purpose of using the SSTables for bulk upload anyway.

 So, are there ways to use the CQLSSTableWriter concurrently?

 If not, what is the next best option to load batch data at high
 throughput in Cassandra?

 Will the upcoming Spark-Cassandra integration help with this? (ie.
 should I just sit back, relax and the problem will solve itself?)

 Thanks,

 Gerard.







Re: Integrate Spark Editor with Hue for source compiled installation of spark/spark-jobServer

2014-06-27 Thread Romain Rigaux
So far Spark Job Server does not work with Spark 1.0:
https://github.com/ooyala/spark-jobserver

So this works only with Spark 0.9 currently:
http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/

Romain



Romain


On Tue, Jun 24, 2014 at 9:04 AM, Sunita Arvind sunitarv...@gmail.com
wrote:

 Hello Experts,

 I am attempting to integrate Spark Editor with Hue on CDH5.0.1. I have the
 spark installation build manually from the sources for spark1.0.0. I am
 able to integrate this with cloudera manager.

 Background:
 ---
 We have a 3 node VM cluster with CDH5.0.1
 We requried spark1.0.0 due to some features in it, so I did a

  yum remove spark-core spark-master spark-worker spark-python

  of the default spark0.9.0 and compiled spark1.0.0 from source:

 Downloaded the spark-trunk from

 git clone https://github.com/apache/spark.git
 cd spark
 SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly

  The spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar was built and spark by
 itself seems to work well. I was even able to run a text file count.

 Current attempt:
 
 Referring to this article -
 http://gethue.com/a-new-spark-web-ui-spark-app/
 Now I am trying to add the Spark editor to Hue. AFAIK, this requires
 git clone https://github.com/ooyala/spark-jobserver.git
 cd spark-jobserver
 sbt
 re-start

 This was successful after lot of struggle with the proxy settings.
 However, is this the job Server itself? Will that mean the job Server has
 to be manually started. I intend to have the spark editor show up in hue
 web UI and I am no way close. Can some one please help?

 Note, the 3 VMs are Linux CentOS. Not sure if setting something like can
 be expected to work.:

 [desktop]
 app_blacklist=


 Also, I have made the changes to vim .
 /job-server/src/main/resources/application.conf as recommended, however,
 I do not expect this to impact hue in any way.

 Also, I intend to let the editor stay available, not spawn it everytime it
 is required.


 Thanks in advance.

 regards



Re: Map with filter on JavaRdd

2014-06-27 Thread Daniel Siegmann
If for some reason it would be easier to do your mapping and filtering in a
single function, you can also use RDD.flatMap (returning an empty sequence
is equivalent to a filter). But unless you have good reason you should have
a separate map and filter transform, as Mayur said.


On Fri, Jun 27, 2014 at 7:43 AM, ajay garg ajay.g...@mobileum.com wrote:

 Thanks Mayur for clarification..



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401p8410.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


problem when start spark streaming in cluster mode

2014-06-27 Thread Siyuan he
Hi all,

I can start a spark streaming app in Client mode on a Pseudo-standalone
cluster on my local machine.

However when I tried to start it in Cluster mode. It always got the
following exception on the Driver.

Exception in thread main akka.ConfigurationException: Could not
start logger due to [akka.ConfigurationException: Logger specified in
config can't be loaded [akka.event.slf4j.Slf4jLogger] due to
[akka.event.Logging$LoggerInitializationException: Logger
log1-Slf4jLogger did not respond with LoggerInitialized, sent instead
[TIMEOUT]]]

Can someone help?

Thanks,
siyuan


scopt.OptionParser

2014-06-27 Thread SK
Hi,

I tried to develop some code to use Logistic Regression, following the code
in BinaryClassification.scala in examples/mllib. My code compiles, but at
runtime complains that scopt/OptionParser class cannot be found. I have the
following import statement in my code:

import scopt.OptionParser


My sbt file contains the following dependencies:

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 1.0.0

libraryDependencies += org.apache.spark %% spark-mllib % 1.0.0

libraryDependencies += com.github.scopt %% scopt % 3.2.0

resolvers += Akka Repository at http://repo.akka.io/releases/;

Is there anything else I need to do to include the OptionParser?

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark job tracker.

2014-06-27 Thread abhiguruvayya
Hello Mayur,

Are you using SparkListener interface java API? I tried using it but was
unsuccessful. So need few more inputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8438.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
For some reason, commenting out spark.driver.host and spark.driver.port 
fixed something...and broke something else (or at least revealed another 
problem). For reference, the only lines I have in my spark-defaults.conf 
now:


spark.app.name  myProg
spark.masterspark://192.168.1.101:5060
spark.executor.memory   8g
spark.files.overwrite   true

It starts up, but has problems with machine2. For some reason, machine2 
is having trouble communicating with *itself*. Here are the worker logs 
of one of the failures (there are 10 before it quits):


Spark assembly has been built with Hive, including Datanucleus jars on 
classpath
14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp 
::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar 
-XX:MaxPermSize=128m -Xms8192M -Xmx8192M 
org.apache.spark.executor.CoarseGrainedExecutorBackend 
akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7 
machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker 
app-20140627144512-0001
14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 
finished with state FAILED message Command exited with code 1 exitStatus 1
14/06/27 14:56:54 INFO LocalActorRef: Message 
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] 
from Actor[akka://sparkWorker/deadLetters] to 
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] 
was not delivered. [10] dead letters encountered. This logging can be 
turned off or adjusted with configuration settings 
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@machine2:48019] - 
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed 
with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@machine2:60949]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: machine2/130.49.226.148:60949

]
14/06/27 14:56:54 INFO Worker: Asked to launch executor 
app-20140627144512-0001/8 for Funtown, USA
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@machine2:48019] - 
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed 
with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@machine2:60949]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: machine2/130.49.226.148:60949

]
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError 
[akka.tcp://sparkWorker@machine2:48019] - 
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed 
with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with 
[akka.tcp://sparkExecutor@machine2:60949]
Caused by: 
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: 
Connection refused: machine2/130.49.226.148:60949

]

Port 48019 on machine2 is indeed open, connected, and listening. Any ideas?

Thanks!

Shannon

On 6/27/14, 1:54 AM, sujeetv wrote:

Try to explicitly set set the spark.driver.host property to the master's
IP.
Sujeet



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Spark standalone network configuration problems

2014-06-27 Thread Sujeet Varakhedi
Looks like your driver is not able to connect to the remote executor on
machine2/130.49.226.148:60949.  Cn you check if the master machine can
route to 130.49.226.148

Sujeet


On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu wrote:

 For some reason, commenting out spark.driver.host and spark.driver.port
 fixed something...and broke something else (or at least revealed another
 problem). For reference, the only lines I have in my spark-defaults.conf
 now:

 spark.app.name  myProg
 spark.masterspark://192.168.1.101:5060
 spark.executor.memory   8g
 spark.files.overwrite   true

 It starts up, but has problems with machine2. For some reason, machine2 is
 having trouble communicating with *itself*. Here are the worker logs of one
 of the failures (there are 10 before it quits):


 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp
 ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/
 spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.
 2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-
 rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/
 datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-
 hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m
 -Xms8192M -Xmx8192M 
 org.apache.spark.executor.CoarseGrainedExecutorBackend
 akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7
 machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker
 app-20140627144512-0001
 14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 finished
 with state FAILED message Command exited with code 1 exitStatus 1
 14/06/27 14:56:54 INFO LocalActorRef: Message [akka.remote.transport.
 ActorTransportAdapter$DisassociateUnderlying] from
 Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/
 system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%
 2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] was not delivered.
 [10] dead letters encountered. This logging can be turned off or adjusted
 with configuration settings 'akka.log-dead-letters' and
 'akka.log-dead-letters-during-shutdown'.
 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@machine2:48019] - 
 [akka.tcp://sparkExecutor@machine2:60949]:
 Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]]
 [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkExecutor@machine2:60949]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: machine2/130.49.226.148:60949
 ]
 14/06/27 14:56:54 INFO Worker: Asked to launch executor
 app-20140627144512-0001/8 for Funtown, USA
 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@machine2:48019] - 
 [akka.tcp://sparkExecutor@machine2:60949]:
 Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]]
 [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkExecutor@machine2:60949]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: machine2/130.49.226.148:60949
 ]
 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
 [akka.tcp://sparkWorker@machine2:48019] - 
 [akka.tcp://sparkExecutor@machine2:60949]:
 Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]]
 [
 akka.remote.EndpointAssociationException: Association failed with
 [akka.tcp://sparkExecutor@machine2:60949]
 Caused by: 
 akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
 Connection refused: machine2/130.49.226.148:60949
 ]

 Port 48019 on machine2 is indeed open, connected, and listening. Any ideas?

 Thanks!

 Shannon

 On 6/27/14, 1:54 AM, sujeetv wrote:

 Try to explicitly set set the spark.driver.host property to the master's
 IP.
 Sujeet



 --
 View this message in context: http://apache-spark-user-list.
 1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-
 tp8304p8396.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
Apologies; can you advise as to how I would check that? I can certainly 
SSH from master to machine2.


On 6/27/14, 3:22 PM, Sujeet Varakhedi wrote:
Looks like your driver is not able to connect to the remote executor 
on machine2/130.49.226.148:60949 http://130.49.226.148:60949/.  Cn 
you check if the master machine can route to 130.49.226.148


Sujeet


On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu 
mailto:squ...@gatech.edu wrote:


For some reason, commenting out spark.driver.host and
spark.driver.port fixed something...and broke something else (or
at least revealed another problem). For reference, the only lines
I have in my spark-defaults.conf now:

spark.app.name http://spark.app.name  myProg
spark.masterspark://192.168.1.101:5060
http://192.168.1.101:5060
spark.executor.memory   8g
spark.files.overwrite   true

It starts up, but has problems with machine2. For some reason,
machine2 is having trouble communicating with *itself*. Here are
the worker logs of one of the failures (there are 10 before it
quits):


Spark assembly has been built with Hive, including Datanucleus
jars on classpath
14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java
-cp

::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar
-XX:MaxPermSize=128m -Xms8192M -Xmx8192M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7
machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker
app-20140627144512-0001
14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7
finished with state FAILED message Command exited with code 1
exitStatus 1
14/06/27 14:56:54 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to

Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003]
was not delivered. [10] dead letters encountered. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]
14/06/27 14:56:54 INFO Worker: Asked to launch executor
app-20140627144512-0001/8 for Funtown, USA
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]

Port 48019 on machine2 is indeed open, connected, and listening.
Any ideas?

Thanks!

Shannon

On 6/27/14, 1:54 AM, sujeetv wrote:

Try to explicitly set set the spark.driver.host property to
the master's
IP.
Sujeet



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.







RE: problem when start spark streaming in cluster mode

2014-06-27 Thread Haoming Zhang
Hi Siyuan,

Can you try this solution? 
http://stackoverflow.com/questions/21943353/akka-2-3-0-fails-to-load-slf4jeventhandler-class-with-java-lang-classnotfounde
 

Best
Date: Fri, 27 Jun 2014 14:18:59 -0400
Subject: problem when start spark streaming in cluster mode
From: hsy...@gmail.com
To: user@spark.apache.org

Hi all,
I can start a spark streaming app in Client mode on a Pseudo-standalone 
cluster on my local machine. 

However when I tried to start it in Cluster mode. It always got the following 
exception on the Driver.

Exception in thread main akka.ConfigurationException: Could not start logger 
due to [akka.ConfigurationException: Logger specified in config can't be loaded 
[akka.event.slf4j.Slf4jLogger] due to 
[akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did 
not respond with LoggerInitialized, sent instead [TIMEOUT]]]
Can someone help?
Thanks,
siyuan

jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)

2014-06-27 Thread M Singh
Hi:

I am using spark to stream data to cassandra and it works fine in local mode. 
But when I execute the application in a standalone clustered env I got 
exception included below (java.lang.NoClassDefFoundError: 
org/codehaus/jackson/annotate/JsonClass).

I think this is due to the jackson-core-asl dependency conflict 
(jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not).  The 1.9.x 
version is being pulled in by spark-sql project.  I tried adding 
jackson-core-asl 1.8.8 with --jars argument while submitting the application 
for execution but it did not work.  So I created a custom spark build excluding 
sql project.  With this custom spark install I was able to resolve the issue at 
least on a single node cluster (separate master and worker).  

If there is an alternate way to resolve this conflicting jar issue without a 
custom build (eg: configuration to use the user defined jars in the executor 
class path first), please let me know.

Also, is there a comprehensive list of configuration properties available for 
spark ?


Thanks

Mans


Exception trace


 TaskSetManager: Loss was due to java.lang.NoClassDefFoundError
java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass
at 
org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524)
at 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732)
at
 
org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136)
at 
org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157)
at 
org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468)
at
 org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402)
at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)

Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Peng Cheng
I give up, communication must be blocked by the complex EC2 network topology
(though the error information indeed need some improvement). It doesn't make
sense to run a client thousands miles away to communicate frequently with
workers. I have moved everything to EC2 now.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Improving Spark multithreaded performance?

2014-06-27 Thread Xiangrui Meng
Hi Kyle,

A few questions:

1) Did you use `setIntercept(true)`?
2) How many features?

I'm a little worried about driver's load because the final aggregation
and weights update happen on the driver. Did you check driver's memory
usage as well?

Best,
Xiangrui

On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:
 As far as I can tell there are is no data to broadcast (unless there is
 something internal to mllib that needs to be broadcast) I've coalesced the
 input RDDs to keep the number of partitions limited. When running, I've
 tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
 to 2 partitions, so about 1000 tasks.
 Despite having over 500 threads in the threadpool working on mllib tasks,
 the total CPU usage never really goes above 150%.
 I've tried increasing 'spark.akka.threads' but that doesn't seem to do
 anything.

 My one thought would be that maybe because I'm using MLUtils.kFold to
 generate the RDDs is that because I have so many tasks working off RDDs that
 are permutations of original RDDs that maybe that is creating some sort of
 dependency bottleneck.

 Kyle


 On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote:

 I don't have specific solutions for you, but the general things to try
 are:

 - Decrease task size by broadcasting any non-trivial objects.
 - Increase duration of tasks by making them less fine-grained.

 How many tasks are you sending? I've seen in the past something like 25
 seconds for ~10k total medium-sized tasks.


 On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm working to set up a calculation that involves calling mllib's
 SVMWithSGD.train several thousand times on different permutations of the
 data. I'm trying to run the separate jobs using a threadpool to dispatch the
 different requests to a spark context connected a Mesos's cluster, using
 course scheduling, and a max of 2000 cores on Spark 1.0.
 Total utilization of the system is terrible. Most of the 'aggregate at
 GradientDescent.scala:178' stages(where mllib spends most of its time) take
 about 3 seconds, but have ~25 seconds of scheduler delay time.
 What kind of things can I do to improve this?

 Kyle





Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-06-27 Thread Xiangrui Meng
Try to use --executor-memory 12g with spark-summit. Or you can set it
in conf/spark-defaults.properties and rsync it to all workers and then
restart. -Xiangrui

On Fri, Jun 27, 2014 at 1:05 PM, Peng Cheng pc...@uow.edu.au wrote:
 I give up, communication must be blocked by the complex EC2 network topology
 (though the error information indeed need some improvement). It doesn't make
 sense to run a client thousands miles away to communicate frequently with
 workers. I have moved everything to EC2 now.



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
This will be handy for demo and quick prototyping as the command-line REPL
doesn't support a lot of editor features, also, you don't need to ssh into
your worker/master if your client is behind an NAT wall. Since Spark
codebase has a minimalistic design philosophy I don't think this component
can make into the main repository. However it can be an independent project
that is also supported by the community (like Solr/ElasticSearch to Lucene)

I've reviewed and tested a few REPL web ui including:
- Scala-notebook: https://github.com/Bridgewater/scala-notebook
- Tinsmiths: https://github.com/kouphax/tinsmith
- IScala: https://github.com/mattpap/IScala
- Codebrew: https://codebrew.io/

however they are either too heavyweight, or their ILoop is buried very deep
(sometimes even in another library). I'm interested in working on this part,
has anyone experimented on similar solution before?

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem when start spark streaming in cluster mode

2014-06-27 Thread Siyuan he
Hey Haoming,

Actually akka.loggers has already been set to
akka.event.slf4j.Slf4jLogger. You can check
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Regards,
SY


On Fri, Jun 27, 2014 at 3:55 PM, Haoming Zhang haoming.zh...@outlook.com
wrote:

 Hi Siyuan,

 Can you try this solution?
 http://stackoverflow.com/questions/21943353/akka-2-3-0-fails-to-load-slf4jeventhandler-class-with-java-lang-classnotfounde

 Best
 --
 Date: Fri, 27 Jun 2014 14:18:59 -0400
 Subject: problem when start spark streaming in cluster mode
 From: hsy...@gmail.com
 To: user@spark.apache.org


 Hi all,

 I can start a spark streaming app in Client mode on a Pseudo-standalone
 cluster on my local machine.

 However when I tried to start it in Cluster mode. It always got the
 following exception on the Driver.

 Exception in thread main akka.ConfigurationException: Could not start 
 logger due to [akka.ConfigurationException: Logger specified in config can't 
 be loaded [akka.event.slf4j.Slf4jLogger] due to 
 [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger 
 did not respond with LoggerInitialized, sent instead [TIMEOUT]]]

 Can someone help?

 Thanks,
 siyuan



Could not compute split, block not found

2014-06-27 Thread Bill Jay
Hi,

I am running a spark streaming job with 1 minute as the batch size. It ran
around 84 minutes and was killed because of the exception with the
following information:

*java.lang.Exception: Could not compute split, block input-0-1403893740400
not found*


Before it was killed, it was able to correctly generate output for each
batch.

Any help on this will be greatly appreciated.

Bill


Re: ElasticSearch enrich

2014-06-27 Thread boci
Ok I found dynamic resources, but I have a frustrating problem. This is the
flow:
kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

My problem is: if I do this it's not work, the enrich functions not called,
but if I put a print it's does. for example if I do this:
kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

The enrich X and enrich Y called but enrich Z not
if I put the print after the enrich Z it's will be printed. How can I solve
this? (what can I do to call the foreachRDD I put breakpoint inside the map
function (where I'm generate the writable) but it's not called)

Any idea?

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but in
 this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index (so
 how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let me
 know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your project
 to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
 wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to create
 es connection, but in prodution I want to use ElasticClient.remote, 
 to this
 I want to pass ElasticClient to mapPartitions, or what is the
 best practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different client in local mode just have a flag that control what type 
 of
 client you create.

 - my stream output is write into elasticsearch. How can I
 test output.saveAsHadoopFile[ESOutputFormat](-) in local 
 environment?


 - After store the enriched data into ES, I want to generate
 aggregated data (EsInputFormat) how can I test it in local?

 I think the simplest thing to do would be use the same client in
 mode and just start single node 

Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
So a few quick questions:

1) What cluster are you running this against? Is it just local? Have you
tried local[4]?
2) When you say breakpoint, how are you setting this break point? There is
a good chance your breakpoint mechanism doesn't work in a distributed
environment, could you instead cause a side effect (like writing to a file)?

Cheers,

Holden :)


On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but in
 this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index (so
 how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca
  wrote:



 On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote:

 Hi guys, thanks the direction now I have some problem/question:
 - in local (test) mode I want to use ElasticClient.local to
 create es connection, but in prodution I want to use 
 ElasticClient.remote,
 to this I want to pass ElasticClient to mapPartitions, or what
 is the best practices?

 In this case you probably want to make the ElasticClient inside of
 mapPartitions (since it isn't serializable) and if you want to use a
 different 

RE: ElasticSearch enrich

2014-06-27 Thread Adrian Mocanu
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215,
 could you post your code? I am interested in your solution.

Thanks
Adrian

From: boci [mailto:boci.b...@gmail.com]
Sent: June-26-14 6:17 PM
To: user@spark.apache.org
Subject: Re: ElasticSearch enrich

Wow, thanks your fast answer, it's help a lot...

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
Hi b0c1,

I have an example of how to do this in the repo for my talk as well, the 
specific example is at 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then 
call  saveAsHadoopDataset on the RDD that gets passed into the function we 
provide to foreachRDD.

e.g.

stream.foreachRDD{(data, time) =
 val jobconf = ...
 data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
}

Hope that helps :)

Cheers,

Holden :)

On Thu, Jun 26, 2014 at 2:23 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Thanks. I without local option I can connect with es remote, now I only have 
one problem. How can I use elasticsearch-hadoop with spark streaming? I mean 
DStream doesn't have saveAsHadoopFiles method, my second problem the output 
index is depend by the input data.

Thanks

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote:
You can just add elasticsearch-hadoop as a dependency to your project to user 
the ESInputFormat and ESOutputFormat 
(https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics 
here: 
http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

For testing, yes I think you will need to start ES in local mode (just 
./bin/elasticsearch) and use the default config (host = localhost, port = 9200).

On Thu, Jun 26, 2014 at 9:04 AM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
That's okay, but hadoop has ES integration. what happened if I run 
saveAsHadoopFile without hadoop (or I must need to pull up hadoop 
programatically? (if I can))

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:

On Wed, Jun 25, 2014 at 4:16 PM, boci 
boci.b...@gmail.commailto:boci.b...@gmail.com wrote:
Hi guys, thanks the direction now I have some problem/question:
- in local (test) mode I want to use ElasticClient.local to create es 
connection, but in prodution I want to use ElasticClient.remote, to this I want 
to pass ElasticClient to mapPartitions, or what is the best practices?
In this case you probably want to make the ElasticClient inside of 
mapPartitions (since it isn't serializable) and if you want to use a different 
client in local mode just have a flag that control what type of client you 
create.
- my stream output is write into elasticsearch. How can I test 
output.saveAsHadoopFile[ESOutputFormat](-) in local environment?
- After store the enriched data into ES, I want to generate aggregated data 
(EsInputFormat) how can I test it in local?
I think the simplest thing to do would be use the same client in mode and just 
start single node elastic search cluster.

Thanks guys

b0c1



--
Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com

On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau 
hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote:
So I'm giving a talk at the Spark summit on using Spark  ElasticSearch, but 
for now if you want to see a simple demo which uses elasticsearch for geo input 
you can take a look at my quick  dirty implementation with 
TopTweetsInALocation ( 
https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala
 ). This approach uses the ESInputFormat which avoids the difficulty of having 
to manually create ElasticSearch clients.

This approach might not work for your data, e.g. if you need to create a query 
for each record in your RDD. If this is the case, you could instead look at 
using mapPartitions and setting up your Elasticsearch connection inside of 
that, so you could then re-use 

Re: ElasticSearch enrich

2014-06-27 Thread boci
This is a simply scalatest. I start a SparkConf, set the master to local
(set the serializer etc), pull up kafka and es connection send a message to
kafka and wait 30sec to processing.

It's run in IDEA no magick trick.

b0c1

--
Skype: boci13, Hangout: boci.b...@gmail.com


On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca wrote:

 So a few quick questions:

 1) What cluster are you running this against? Is it just local? Have you
 tried local[4]?
 2) When you say breakpoint, how are you setting this break point? There is
 a good chance your breakpoint mechanism doesn't work in a distributed
 environment, could you instead cause a side effect (like writing to a file)?

 Cheers,

 Holden :)


 On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but
 in this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index
 (so how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some other
 basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = 
 localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I run
 saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau 
 hol...@pigscanfly.ca wrote:



 On Wed, Jun 25, 2014 

Re: Spark standalone network configuration problems

2014-06-27 Thread Shannon Quinn
I switched which machine was the master and which was the dedicated 
worker, and now it works just fine. I discovered machine2 is on my 
department's DMZ; machine1 is not. I suspect the departmental firewall 
was causing problems. By moving the master to machine2, that seems to 
have solved my problems.


Thank you all very much for your help. I'm sure I'll have other 
questions soon :)


Regards,
Shannon

On 6/27/14, 3:22 PM, Sujeet Varakhedi wrote:
Looks like your driver is not able to connect to the remote executor 
on machine2/130.49.226.148:60949 http://130.49.226.148:60949/.  Cn 
you check if the master machine can route to 130.49.226.148


Sujeet


On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu 
mailto:squ...@gatech.edu wrote:


For some reason, commenting out spark.driver.host and
spark.driver.port fixed something...and broke something else (or
at least revealed another problem). For reference, the only lines
I have in my spark-defaults.conf now:

spark.app.name http://spark.app.name  myProg
spark.masterspark://192.168.1.101:5060
http://192.168.1.101:5060
spark.executor.memory   8g
spark.files.overwrite   true

It starts up, but has problems with machine2. For some reason,
machine2 is having trouble communicating with *itself*. Here are
the worker logs of one of the failures (there are 10 before it
quits):


Spark assembly has been built with Hive, including Datanucleus
jars on classpath
14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java
-cp

::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar
-XX:MaxPermSize=128m -Xms8192M -Xmx8192M
org.apache.spark.executor.CoarseGrainedExecutorBackend
akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7
machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker
app-20140627144512-0001
14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7
finished with state FAILED message Command exited with code 1
exitStatus 1
14/06/27 14:56:54 INFO LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to

Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003]
was not delivered. [10] dead letters encountered. This logging can
be turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]
14/06/27 14:56:54 INFO Worker: Asked to launch executor
app-20140627144512-0001/8 for Funtown, USA
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]
14/06/27 14:56:54 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkWorker@machine2:48019] -
[akka.tcp://sparkExecutor@machine2:60949]: Error [Association
failed with [akka.tcp://sparkExecutor@machine2:60949]] [
akka.remote.EndpointAssociationException: Association failed with
[akka.tcp://sparkExecutor@machine2:60949]
Caused by:
akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
Connection refused: machine2/130.49.226.148:60949
http://130.49.226.148:60949
]

Port 48019 on machine2 is indeed open, connected, and listening.
Any ideas?

Thanks!

Shannon

On 6/27/14, 1:54 AM, sujeetv wrote:

Try to explicitly set set the spark.driver.host property to
the master's
IP.
Sujeet



--
View this message in context:


Re: ElasticSearch enrich

2014-06-27 Thread Holden Karau
Try setting the master to local[4]


On Fri, Jun 27, 2014 at 2:17 PM, boci boci.b...@gmail.com wrote:

 This is a simply scalatest. I start a SparkConf, set the master to local
 (set the serializer etc), pull up kafka and es connection send a message to
 kafka and wait 30sec to processing.

 It's run in IDEA no magick trick.

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 So a few quick questions:

 1) What cluster are you running this against? Is it just local? Have you
 tried local[4]?
 2) When you say breakpoint, how are you setting this break point? There
 is a good chance your breakpoint mechanism doesn't work in a distributed
 environment, could you instead cause a side effect (like writing to a file)?

 Cheers,

 Holden :)


 On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote:

 Ok I found dynamic resources, but I have a frustrating problem. This is
 the flow:
 kafka - enrich X - enrich Y - enrich Z - foreachRDD - save

 My problem is: if I do this it's not work, the enrich functions not
 called, but if I put a print it's does. for example if I do this:
 kafka - enrich X - enrich Y - print - enrich Z - foreachRDD

 The enrich X and enrich Y called but enrich Z not
 if I put the print after the enrich Z it's will be printed. How can I
 solve this? (what can I do to call the foreachRDD I put breakpoint inside
 the map function (where I'm generate the writable) but it's not called)

 Any idea?

 b0c1




 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote:

 Another question. In the foreachRDD I will initialize the JobConf, but
 in this place how can I get information from the items?
 I have an identifier in the data which identify the required ES index
 (so how can I set dynamic index in the foreachRDD) ?

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Just your luck I happened to be working on that very talk today :) Let
 me know how your experiences with Elasticsearch  Spark go :)


 On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote:

 Wow, thanks your fast answer, it's help a lot...

 b0c1


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca
 wrote:

 Hi b0c1,

 I have an example of how to do this in the repo for my talk as well,
 the specific example is at
 https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala
 . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and
 then call  saveAsHadoopDataset on the RDD that gets passed into the
 function we provide to foreachRDD.

 e.g.

 stream.foreachRDD{(data, time) =
  val jobconf = ...
  data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf)
 }

 Hope that helps :)

 Cheers,

 Holden :)


 On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote:

 Thanks. I without local option I can connect with es remote, now I
 only have one problem. How can I use elasticsearch-hadoop with spark
 streaming? I mean DStream doesn't have saveAsHadoopFiles method, my
 second problem the output index is depend by the input data.

 Thanks


 --
 Skype: boci13, Hangout: boci.b...@gmail.com


 On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath 
 nick.pentre...@gmail.com wrote:

 You can just add elasticsearch-hadoop as a dependency to your
 project to user the ESInputFormat and ESOutputFormat (
 https://github.com/elasticsearch/elasticsearch-hadoop). Some
 other basics here:
 http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html

 For testing, yes I think you will need to start ES in local mode
 (just ./bin/elasticsearch) and use the default config (host = 
 localhost,
 port = 9200).


 On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote:

 That's okay, but hadoop has ES integration. what happened if I
 run saveAsHadoopFile without hadoop (or I must need to pull up hadoop
 programatically? (if I can))

 b0c1


 --
 Skype: boci13, Hangout: 

Re: Improving Spark multithreaded performance?

2014-06-27 Thread Kyle Ellrott
1) I'm using the static SVMWithSGD.train, with no options.
2) I have about 20,000 features (~5000 samples) that are being attached and
trained against 14,000 different sets of labels (ie I'll be doing 14,000
different training runs against the same sets of features trying to figure
out which labels can be learned), and I would also like to do cross fold
validation.

The driver doesn't seem to be using too much memory. I left it as -Xmx8g
and it never complained.

Kyle



On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote:

 Hi Kyle,

 A few questions:

 1) Did you use `setIntercept(true)`?
 2) How many features?

 I'm a little worried about driver's load because the final aggregation
 and weights update happen on the driver. Did you check driver's memory
 usage as well?

 Best,
 Xiangrui

 On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:
  As far as I can tell there are is no data to broadcast (unless there is
  something internal to mllib that needs to be broadcast) I've coalesced
 the
  input RDDs to keep the number of partitions limited. When running, I've
  tried to get up to 500 concurrent stages, and I've coalesced the RDDs
 down
  to 2 partitions, so about 1000 tasks.
  Despite having over 500 threads in the threadpool working on mllib tasks,
  the total CPU usage never really goes above 150%.
  I've tried increasing 'spark.akka.threads' but that doesn't seem to do
  anything.
 
  My one thought would be that maybe because I'm using MLUtils.kFold to
  generate the RDDs is that because I have so many tasks working off RDDs
 that
  are permutations of original RDDs that maybe that is creating some sort
 of
  dependency bottleneck.
 
  Kyle
 
 
  On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com
 wrote:
 
  I don't have specific solutions for you, but the general things to try
  are:
 
  - Decrease task size by broadcasting any non-trivial objects.
  - Increase duration of tasks by making them less fine-grained.
 
  How many tasks are you sending? I've seen in the past something like 25
  seconds for ~10k total medium-sized tasks.
 
 
  On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
  wrote:
 
  I'm working to set up a calculation that involves calling mllib's
  SVMWithSGD.train several thousand times on different permutations of
 the
  data. I'm trying to run the separate jobs using a threadpool to
 dispatch the
  different requests to a spark context connected a Mesos's cluster,
 using
  course scheduling, and a max of 2000 cores on Spark 1.0.
  Total utilization of the system is terrible. Most of the 'aggregate at
  GradientDescent.scala:178' stages(where mllib spends most of its time)
 take
  about 3 seconds, but have ~25 seconds of scheduler delay time.
  What kind of things can I do to improve this?
 
  Kyle
 
 
 



Re: Spark vs Google cloud dataflow

2014-06-27 Thread Marco Shaw
Dean: Some interesting information... Do you know where I can read more about 
these coming changes to Scalding/Cascading?

 On Jun 27, 2014, at 9:40 AM, Dean Wampler deanwamp...@gmail.com wrote:
 
 ... and to be clear on the point, Summingbird is not limited to MapReduce. It 
 abstracts over Scalding (which abstracts over Cascading, which is being moved 
 from MR to Spark) and over Storm for event processing.
 
 
 On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote:
 On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com 
 wrote:
  Summingbird is for map/reduce. Dataflow is the third generation of google's
  map/reduce, and it generalizes map/reduce the way Spark does. See more 
  about
  this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s
 
 Yes, my point was that Summingbird is similar in that it is a
 higher-level service for batch/streaming computation, not that it is
 similar for being MapReduce-based.
 
  It seems Dataflow is based on this paper:
  http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf
 
 FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is
 more than that but yeah that seems to be some of the 'language'. It is
 similar in that it is a distributed collection abstraction.
 
 
 
 -- 
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com


Re: Spark vs Google cloud dataflow

2014-06-27 Thread Marco Shaw
Sorry. Never mind...  I guess that's what Summingbird is all about. Never 
heard of it. 

 On Jun 27, 2014, at 7:10 PM, Marco Shaw marco.s...@gmail.com wrote:
 
 Dean: Some interesting information... Do you know where I can read more about 
 these coming changes to Scalding/Cascading?
 
 On Jun 27, 2014, at 9:40 AM, Dean Wampler deanwamp...@gmail.com wrote:
 
 ... and to be clear on the point, Summingbird is not limited to MapReduce. 
 It abstracts over Scalding (which abstracts over Cascading, which is being 
 moved from MR to Spark) and over Storm for event processing.
 
 
 On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote:
 On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com 
 wrote:
  Summingbird is for map/reduce. Dataflow is the third generation of 
  google's
  map/reduce, and it generalizes map/reduce the way Spark does. See more 
  about
  this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s
 
 Yes, my point was that Summingbird is similar in that it is a
 higher-level service for batch/streaming computation, not that it is
 similar for being MapReduce-based.
 
  It seems Dataflow is based on this paper:
  http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf
 
 FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is
 more than that but yeah that seems to be some of the 'language'. It is
 similar in that it is a distributed collection abstraction.
 
 
 
 -- 
 Dean Wampler, Ph.D.
 Typesafe
 @deanwampler
 http://typesafe.com
 http://polyglotprogramming.com


Re: Spark vs Google cloud dataflow

2014-06-27 Thread Khanderao Kand
DataFlow is based on two papers, MillWheel for Stream processing and
FlumeJava for programming optimization and abstraction.

Millwheel http://research.google.com/pubs/pub41378.html
FlumeJava http://dl.acm.org/citation.cfm?id=1806638

Here is my blog entry on this
http://texploration.wordpress.com/2014/06/26/google-dataflow-service-to-fight-against-amazon-kinesis/




On Fri, Jun 27, 2014 at 5:16 AM, Sean Owen so...@cloudera.com wrote:

 On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com
 wrote:
  Summingbird is for map/reduce. Dataflow is the third generation of
 google's
  map/reduce, and it generalizes map/reduce the way Spark does. See more
 about
  this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s

 Yes, my point was that Summingbird is similar in that it is a
 higher-level service for batch/streaming computation, not that it is
 similar for being MapReduce-based.

  It seems Dataflow is based on this paper:
  http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf

 FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is
 more than that but yeah that seems to be some of the 'language'. It is
 similar in that it is a distributed collection abstraction.



hadoop + yarn + spark

2014-06-27 Thread sdeb
Hello,

I have installed spark on top of hadoop + yarn.
when I launch the pyspark shell  try to compute something I get this error.

Error from python worker:
  /usr/bin/python: No module named pyspark

The pyspark module should be there, do I have to put an external link to it?

--Sanghamitra.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-yarn-spark-tp8466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Anybody changed their mind about going to the Spark Summit 2014

2014-06-27 Thread Cesar Arevalo
Hi All:

I was wondering if anybody had bought a ticket for the upcoming Spark
Summit 2014 this coming week and had changed their mind about going.

Let me know, since it has sold out and I can't buy a ticket anymore, I
would be interested in buying it.

Best,
-- 
Cesar Arevalo
Software Engineer ❘ Zephyr Health
450 Mission Street, Suite #201 ❘ San Francisco, CA 94105
m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth
https://twitter.com/zephyrhealth
o: +1 415-529-7649 ❘ f: +1 415-520-9288
http://www.zephyrhealth.com


Re: Integrate spark-shell into officially supported web ui/api plug-in? What do you think?

2014-06-27 Thread Peng Cheng
That would be really cool with IPython, But I' still wondering if all
language features are supported, namely I need these 2 in particular:
1. importing class and ILoop from external jars (so I can point it to
SparkILoop or Sparkbinding ILoop of Apache Mahout instead of Scala's default
ILoop)
2. implicit typecast/wrapper and implicit variable (widely used in
SparkContext.scala)
I'll be able to start experimentation immediately if someone can confirm
these features.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447p8469.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Interconnect benchmarking

2014-06-27 Thread danilopds
Hi,
According with the research paper bellow of Mathei Zaharia, Spark's creator,
http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf

He says on page 10 that: 
Grep is network-bound due to the cost to replicate the input data to
multiple nodes.

So, 
I guess a can be a good initial recommendation.

But I would like to know others workloads too.
Best Regards.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Interconnect benchmarking

2014-06-27 Thread Aaron Davidson
A simple throughput test is also repartition()ing a large RDD. This also
stresses the disks, though, so you might try to mount your spark temporary
directory as a ramfs.


On Fri, Jun 27, 2014 at 5:57 PM, danilopds danilob...@gmail.com wrote:

 Hi,
 According with the research paper bellow of Mathei Zaharia, Spark's
 creator,
 http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf

 He says on page 10 that:
 Grep is network-bound due to the cost to replicate the input data to
 multiple nodes.

 So,
 I guess a can be a good initial recommendation.

 But I would like to know others workloads too.
 Best Regards.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark job tracker.

2014-06-27 Thread abhiguruvayya
I know this is a very trivial question to ask but I'm a complete new bee to
this stuff so i don't have ne clue on this. Any help is much appreciated. 

For example if i have a class like below, and when i run this through
command line i want to see progress status. some thing like,

10% completed...
30% completed...
100% completed...Job done!

I am using spark 1.0 on yarn and using Java API.

public class MyJavaWordCount {
  public static void main(String[] args) throws Exception {
if (args.length  2) {
  System.err.println(Usage: MyJavaWordCount master file);
  System.exit(1);
}

System.out.println(args[0]: master=+args[0]);
System.out.println(args[1]: file=+args[1]);

JavaSparkContext ctx = new JavaSparkContext(
args[0], 
MyJavaWordCount,
System.getenv(SPARK_HOME), 
System.getenv(SPARK_EXAMPLES_JAR));
JavaRDDString lines = ctx.textFile(args[1], 1);

//  outputinput   output

JavaRDDString words = lines.flatMap(new FlatMapFunctionString,
String() {
  //  output   input
  public IterableString call(String s) {
return Arrays.asList(s.split( ));
  }
});

//  K   V   
input   K   V
JavaPairRDDString, Integer ones = words.mapToPair(new
PairFunctionString, String, Integer() {
  //K   V input
  public Tuple2String, Integer call(String s) {
//K   V
return new Tuple2String, Integer(s, 1);
  }
});

JavaPairRDDString, Integer counts = ones.reduceByKey(new
Function2Integer, Integer, Integer() {
  public Integer call(Integer i1, Integer i2) {
return i1 + i2;
  }
});

ListTuple2lt;String, Integer output = counts.collect();
for (Tuple2 tuple : output) {
  System.out.println(tuple._1 + :  + tuple._2);
}
System.exit(0);
  }
}



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


HBase 0.96+ with Spark 1.0+

2014-06-27 Thread Stephen Boesch
The present trunk is built and tested against HBase 0.94.


I have tried various combinations of versions of HBase 0.96+ and Spark 1.0+
and all end up with

14/06/27 20:11:15 INFO HttpServer: Starting HTTP Server
[error] (run-main-0) java.lang.SecurityException: class
javax.servlet.FilterRegistration's signer information does not match
signer information of other classes in the same package
java.lang.SecurityException: class javax.servlet.FilterRegistration's
signer information does not match signer information of other classes in
the same package
at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)


I have tried a number of different ways to exclude javax.servlet related
jars. But none have avoided this error.

Anyone have a (small-ish) build.sbt that works with later versions of HBase?


Re: hadoop + yarn + spark

2014-06-27 Thread Patrick Wendell
Hi There,

There is an issue with PySpark-on-YARN that requires users build with
Java 6. The issue has to do with how Java 6 and 7 package jar files
differently.

Can you try building spark with Java 6 and trying again?

- Patrick

On Fri, Jun 27, 2014 at 5:00 PM, sdeb sangha...@gmail.com wrote:
 Hello,

 I have installed spark on top of hadoop + yarn.
 when I launch the pyspark shell  try to compute something I get this error.

 Error from python worker:
   /usr/bin/python: No module named pyspark

 The pyspark module should be there, do I have to put an external link to it?

 --Sanghamitra.




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-yarn-spark-tp8466.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.


Distribute data from Kafka evenly on cluster

2014-06-27 Thread Tobias Pfeiffer
Hi,

I have a number of questions using the Kafka receiver of Spark
Streaming. Maybe someone has some more experience with that and can
help me out.

I have set up an environment for getting to know Spark, consisting of
- a Mesos cluster with 3 only-slaves and 3 master-and-slaves,
- 2 Kafka nodes,
- 3 Zookeeper nodes providing service to both Kafka and Mesos.

My Kafka cluster has only one topic with one partition (replicated to
both nodes). When I start my Kafka receiver, it successfully connects
to Kafka and does the processing, but it seems as if the (expensive)
function in the final foreachRDD(...) is only executed on one node of
my cluster, which is not what I had in mind when setting up the
cluster ;-)

So first, I was wondering about the parameter `topics: Map[String,
Int]` to KafkaUtils.createStream(). Apparently it controls how many
connections are made from my cluster nodes to Kafka. The Kafka doc at
https://kafka.apache.org/documentation.html#introduction says each
message published to a topic is delivered to one consumer instance
within each subscribing consumer group and If all the consumer
instances have the same consumer group, then this works just like a
traditional queue balancing load over the consumers.

The Kafka docs *also* say: Note however that there cannot be more
consumer instances than partitions. This seems to imply that with
only one partition, increasing the number in my Map should have no
effect.

However, if I increase the number of streams for my one topic in my
`topics` Map, I actually *do* see that the task in my foreachRDD(...)
call is now executed on multiple nodes. Maybe it's more of a Kafka
question than a Spark one, but can anyone explain this to me? Should I
always have more Kafka partitions than Mesos cluster nodes?

So, assuming that changing the number in that Map is not what I want
(although I don't know if it is), I tried to use
.repartition(numOfClusterNodes) (which doesn't seem right if I want to
add and remove Mesos nodes on demand). This *also* did spread the
foreachRDD(...) action evenly – however, the function never seems to
terminate, so I never get to process the next interval in the stream.
A similar behavior can be observed when running locally, not on the
cluster, then the program will not exit but instead hang after
everything else has shut down. Any hints concerning this issue?

Thanks
Tobias