Spark Streaming to capture packets from interface
Hi.. I am new to Spark . Is it possible to capture live packets from a network interface through spark streaming? Is there a library or any built in classes to bind to the network interface directly? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-to-capture-packets-from-interface-tp8399.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Map with filter on JavaRdd
Hi All, Is it possible to map and filter a javardd in a single operation? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Map with filter on JavaRdd
It happens in a single operation itself. You may write it separately but the stages are performed together if its possible. You will see only one task in the output of your application. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Fri, Jun 27, 2014 at 12:12 PM, ajay garg ajay.g...@mobileum.com wrote: Hi All, Is it possible to map and filter a javardd in a single operation? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: org.jboss.netty.channel.ChannelException: Failed to bind to: master/1xx.xx..xx:0
Hi Akhil, The IP is correct and is able to start the workers when we start it as a java command.Its becoming 192.168.125.174:0 when we call from the scripts. Thanks Regards, Meethu M On Friday, 27 June 2014 1:49 PM, Akhil Das ak...@sigmoidanalytics.com wrote: why is it binding to port 0? 192.168.125.174:0 :/ Check the ip address of that master machine (ifconfig) looks like the ip address has been changed (hoping you are running this machines on a LAN) Thanks Best Regards On Fri, Jun 27, 2014 at 12:00 PM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, My Spark(Standalone mode) was running fine till yesterday.But now I am getting the following exeception when I am running start-slaves.sh or start-all.sh slave3: failed to launch org.apache.spark.deploy.worker.Worker: slave3: at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) slave3: at java.lang.Thread.run(Thread.java:662) The log files has the following lines. 14/06/27 11:06:30 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/06/27 11:06:30 INFO SecurityManager: Changing view acls to: hduser 14/06/27 11:06:30 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser) 14/06/27 11:06:30 INFO Slf4jLogger: Slf4jLogger started 14/06/27 11:06:30 INFO Remoting: Starting remoting Exception in thread main org.jboss.netty.channel.ChannelException: Failed to bind to: master/192.168.125.174:0 at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) ... Caused by: java.net.BindException: Cannot assign requested address ... I saw the same error reported before and have tried the following solutions. Set the variable SPARK_LOCAL_IP ,Changed the SPARK_MASTER_PORT to a different number..But nothing is working. When I try to start the worker from the respective machines using the following java command,its running without any exception java -cp ::/usr/local/spark-1.0.0/conf:/usr/local/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop1.2.1.jar -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://:master:7077 Somebody please give a solution Thanks Regards, Meethu M
Issue in using classes with constructor as vertex attribute in graphx
Hi, I have a scenario where I am having a class X with constructor parameter as (RDD,Double).When I am initializing the the class object with corresponding RDD and double value (of name say x1) and putting it as a vertex attribute in graph , I am losing my RDD value . The Double value remains intact . I tried accessing simultaneously the RDD from instance variable (x1) and i see it intact there but for some reason it's not available when i take graph vertex attribute and access the RDD. Please help me to understand which concept I am missing here ? And whats the correct way to do it. Regards, Harsh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-in-using-classes-with-constructor-as-vertex-attribute-in-graphx-tp8407.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: [GraphX] Cast error when comparing a vertex attribute after its type has changed
Thanks for having corrected this bug! The fix version is marked as 1.1.0 ( SPARK-1552 https://issues.apache.org/jira/browse/SPARK-1552 ). I have tested my code snippet with Spark 1.0.0 (Scala 2.10.4) and it works. I don't know if it's important to mention it. Pierre-Alexandre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Cast-error-when-comparing-a-vertex-attribute-after-its-type-has-changed-tp4119p8408.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Map with filter on JavaRdd
Thanks Mayur for clarification.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401p8410.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Improving Spark multithreaded performance?
I have not used this, only watched a presentation of it in spark summit 2013. https://github.com/radlab/sparrow https://spark-summit.org/talk/ousterhout-next-generation-spark-scheduling-with-sparrow/ Pure conjecture from your high scheduling latency and the size of your cluster, it seems one way to look at. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Improving-Spark-multithreaded-performance-tp8359p8411.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Fine-grained mesos execution hangs on Debian 7.4
Hello Sebastien, it is not working with the 1.0 branch either. I decided to compile spark from source precisely because of the [SPARK-2204] fix, because before that I couldn't get fine-grained working at all. Now it works fine if the cluster is only composed of Ubuntu 14.04 nodes, and when I introduce the Debian 7.4 nodes they hang like described. I tested both the master (cloned with the [SPARK-2204] fix already inside) and the 1.0 branch with that commit cherry-picked inside. The behaviour is the same: whatever the reason is, it has not been introduced after the 1.0 release. Did anybody else test fine-grained with a Debian 7 or 7.4? 2014-06-26 19:23 GMT+00:00 Sébastien Rainville sebastienrainvi...@gmail.com : Hello Federico, is it working with the 1.0 branch? In either branch, make sure that you have this commit: https://github.com/apache/spark/commit/1132e472eca1a00c2ce10d2f84e8f0e79a5193d3 I never saw the behavior you are describing, but that commit is important if you are running in fine-grained mode, and it was merged only yesterday. - Sebastien On Thu, Jun 26, 2014 at 12:11 PM, Fedechicco fedechi...@gmail.com wrote: Hello, as from object, when I run scala spark-shell on our mesos (0.19) cluster some spark slaves just hang at the end of the staging phase for any given elaboration. The cluster has mixed OSes (Ubuntu 14.04 / Debian 7.4), but if I run the same shell and commands using coarse grained mode everything works just fine. I'm using a spark 1.1.0-SNAPSHOT built from sources (pulled today from git), on openjdk-7. Sadly I can't get any error message from the sandboxes for the hanging slaves, everything seems in order, just stuck. Any suggestion on how to debug this? thanks, Federico
Re: Spark vs Google cloud dataflow
My experience is that gaining 20 spot instances accounts for a tiny fraction of the total time of provisioning a cluster with spark-ec2. This is not (solely) an AWS issue. -- Martin Goodson | VP Data Science (0)20 3397 1240 [image: Inline image 1] On Thu, Jun 26, 2014 at 10:14 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I remember a discussion on here about how the way in which spark-ec2 rsyncs stuff to the cluster for setup could be improved, and I’m assuming there are other such improvements to be made. Perhaps those improvements don’t matter much when compared to EC2 instance launch times, but I’m not sure. On Thu, Jun 26, 2014 at 4:48 PM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Jun 26, 2014 at 9:42 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: That’s technically true, but I’d be surprised if there wasn’t a lot of room for improvement in spark-ec2 regarding cluster launch+config times. Unfortunately, this is a spark support issue, but an AWS one. Starting a few months ago, Amazon AWS services have been having bigger and bigger lags. Indeed, the default timeout hard coded in spark-ec2 is no longer able to launch the cluster successfully, and many people here reported that they had to increase it.
Re: Spark vs Google cloud dataflow
On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote: Summingbird is for map/reduce. Dataflow is the third generation of google's map/reduce, and it generalizes map/reduce the way Spark does. See more about this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s Yes, my point was that Summingbird is similar in that it is a higher-level service for batch/streaming computation, not that it is similar for being MapReduce-based. It seems Dataflow is based on this paper: http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is more than that but yeah that seems to be some of the 'language'. It is similar in that it is a distributed collection abstraction.
Re: Spark vs Google cloud dataflow
... and to be clear on the point, Summingbird is not limited to MapReduce. It abstracts over Scalding (which abstracts over Cascading, which is being moved from MR to Spark) and over Storm for event processing. On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote: On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote: Summingbird is for map/reduce. Dataflow is the third generation of google's map/reduce, and it generalizes map/reduce the way Spark does. See more about this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s Yes, my point was that Summingbird is similar in that it is a higher-level service for batch/streaming computation, not that it is similar for being MapReduce-based. It seems Dataflow is based on this paper: http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is more than that but yeah that seems to be some of the 'language'. It is similar in that it is a distributed collection abstraction. -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
How to use .newAPIHadoopRDD() from Java (w/ Cassandra)
Hello! I have just started trying out Spark to see if it fits my needs, but I am running into some issues when trying to port the CassandraCQLTest.scala example into Java. The specific errors etc. that I encounter can be seen here: http://stackoverflow.com/questions/24450540/how-to-use-sparks-newapihadooprdd-java-equivalent-of-scalas-classof where I have also asked the same question. Any pointers on how to use .newAPIHadoopRDD() and CqlPagingInputFormat from Java is greatly appreciated! (Either here or on Stack Overflow) -- Best regards, Martin Gammelsæter
Re: Spark standalone network configuration problems
I put the settings as you specified in spark-env.sh for the master. When I run start-all.sh, the web UI shows both the worker on the master (machine1) and the slave worker (machine2) as ALIVE and ready, with the master URL at spark://192.168.1.101. However, when I run spark-submit, it immediately crashes with py4j.protocol.Py4JJavaError14/06/27 09:01:32 ERROR Remoting: Remoting error: [Startup failed] akka.remote.RemoteTransportException: Startup failed [...] org.jboss.netty.channel.ChannelException: Failed to bind to /192.168.1.101:5060 [...] java.net.BindException: Address already in use. [...] This seems entirely contrary to intuition; why would Spark be unable to bind to the exact IP:port set for the master? On 6/27/14, 1:54 AM, Akhil Das wrote: Hi Shannon, How about a setting like the following? (just removed the quotes) export SPARK_MASTER_IP=192.168.1.101 export SPARK_MASTER_PORT=5060 #export SPARK_LOCAL_IP=127.0.0.1 Not sure whats happening in your case, it could be that your system is not able to bind to 192.168.1.101 address. What is the spark:// master url that you are seeing there in the webUI? (It should be spark://192.168.1.101:7077 in your case). Thanks Best Regards On Fri, Jun 27, 2014 at 5:47 AM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: In the interest of completeness, this is how I invoke spark: [on master] sbin/start-all.sh spark-submit --py-files extra.py main.py iPhone'd On Jun 26, 2014, at 17:29, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: My *best guess* (please correct me if I'm wrong) is that the master (machine1) is sending the command to the worker (machine2) with the localhost argument as-is; that is, machine2 isn't doing any weird address conversion on its end. Consequently, I've been focusing on the settings of the master/machine1. But I haven't found anything to indicate where the localhost argument could be coming from. /etc/hosts lists only 127.0.0.1 as localhost; spark-defaults.conf list spark.master as the full IP address (not 127.0.0.1); spark-env.sh on the master also lists the full IP under SPARK_MASTER_IP. The *only* place on the master where it's associated with localhost is SPARK_LOCAL_IP. In looking at the logs of the worker spawned on master, it's also receiving a spark://localhost:5060 argument, but since it resides on the master that works fine. Is it possible that the master is, for some reason, passing spark://{SPARK_LOCAL_IP}:5060 to the workers? That was my motivation behind commenting out SPARK_LOCAL_IP; however, that's when the master crashes immediately due to the address already being in use. Any ideas? Thanks! Shannon On 6/26/14, 10:14 AM, Akhil Das wrote: Can you paste your spark-env.sh file? Thanks Best Regards On Thu, Jun 26, 2014 at 7:01 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: Both /etc/hosts have each other's IP addresses in them. Telneting from machine2 to machine1 on port 5060 works just fine. Here's the output of lsof: user@machine1:~/spark/spark-1.0.0-bin-hadoop2$ mailto:user@machine1:%7E/spark/spark-1.0.0-bin-hadoop2$ lsof -i:5060 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java23985 user 30u IPv6 11092354 0t0 TCP machine1:sip (LISTEN) java23985 user 40u IPv6 11099560 0t0 TCP machine1:sip-machine1:48315 (ESTABLISHED) java23985 user 52u IPv6 11100405 0t0 TCP machine1:sip-machine2:54476 (ESTABLISHED) java24157 user 40u IPv6 11092413 0t0 TCP machine1:48315-machine1:sip (ESTABLISHED) Ubuntu seems to recognize 5060 as the standard port for sip; it's not actually running anything there besides Spark, it just does a s/5060/sip/g. Is there something to the fact that every time I comment out SPARK_LOCAL_IP in spark-env, it crashes immediately upon spark-submit due to the address already being in use? Or am I barking up the wrong tree on that one? Thanks again for all your help; I hope we can knock this one out. Shannon On 6/26/14, 9:13 AM, Akhil Das wrote: Do you have ip machine1 in your workers /etc/hosts also? If so try telneting from your machine2 to machine1 on port 5060. Also make sure nothing else is running on port 5060 other than Spark (*/lsof -i:5060/*) Thanks Best Regards On Thu, Jun 26, 2014 at 6:35 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: Still running into the same problem. /etc/hosts on the master says 127.0.0.1localhost ip machine1 ip is the
Re: Spark standalone network configuration problems
No joy, unfortunately. Same issue; see my previous email--still crashes with address already in use. On 6/27/14, 1:54 AM, sujeetv wrote: Try to explicitly set set the spark.driver.host property to the master's IP. Sujeet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark standalone network configuration problems
Sorry, master spark URL in the web UI is *spark://192.168.1.101:5060*, exactly as configured. On 6/27/14, 9:07 AM, Shannon Quinn wrote: I put the settings as you specified in spark-env.sh for the master. When I run start-all.sh, the web UI shows both the worker on the master (machine1) and the slave worker (machine2) as ALIVE and ready, with the master URL at spark://192.168.1.101. However, when I run spark-submit, it immediately crashes with py4j.protocol.Py4JJavaError14/06/27 09:01:32 ERROR Remoting: Remoting error: [Startup failed] akka.remote.RemoteTransportException: Startup failed [...] org.jboss.netty.channel.ChannelException: Failed to bind to /192.168.1.101:5060 [...] java.net.BindException: Address already in use. [...] This seems entirely contrary to intuition; why would Spark be unable to bind to the exact IP:port set for the master? On 6/27/14, 1:54 AM, Akhil Das wrote: Hi Shannon, How about a setting like the following? (just removed the quotes) export SPARK_MASTER_IP=192.168.1.101 export SPARK_MASTER_PORT=5060 #export SPARK_LOCAL_IP=127.0.0.1 Not sure whats happening in your case, it could be that your system is not able to bind to 192.168.1.101 address. What is the spark:// master url that you are seeing there in the webUI? (It should be spark://192.168.1.101:7077 in your case). Thanks Best Regards On Fri, Jun 27, 2014 at 5:47 AM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: In the interest of completeness, this is how I invoke spark: [on master] sbin/start-all.sh spark-submit --py-files extra.py main.py iPhone'd On Jun 26, 2014, at 17:29, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: My *best guess* (please correct me if I'm wrong) is that the master (machine1) is sending the command to the worker (machine2) with the localhost argument as-is; that is, machine2 isn't doing any weird address conversion on its end. Consequently, I've been focusing on the settings of the master/machine1. But I haven't found anything to indicate where the localhost argument could be coming from. /etc/hosts lists only 127.0.0.1 as localhost; spark-defaults.conf list spark.master as the full IP address (not 127.0.0.1); spark-env.sh on the master also lists the full IP under SPARK_MASTER_IP. The *only* place on the master where it's associated with localhost is SPARK_LOCAL_IP. In looking at the logs of the worker spawned on master, it's also receiving a spark://localhost:5060 argument, but since it resides on the master that works fine. Is it possible that the master is, for some reason, passing spark://{SPARK_LOCAL_IP}:5060 to the workers? That was my motivation behind commenting out SPARK_LOCAL_IP; however, that's when the master crashes immediately due to the address already being in use. Any ideas? Thanks! Shannon On 6/26/14, 10:14 AM, Akhil Das wrote: Can you paste your spark-env.sh file? Thanks Best Regards On Thu, Jun 26, 2014 at 7:01 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: Both /etc/hosts have each other's IP addresses in them. Telneting from machine2 to machine1 on port 5060 works just fine. Here's the output of lsof: user@machine1:~/spark/spark-1.0.0-bin-hadoop2$ mailto:user@machine1:%7E/spark/spark-1.0.0-bin-hadoop2$ lsof -i:5060 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java23985 user 30u IPv6 110923540t0 TCP machine1:sip (LISTEN) java23985 user 40u IPv6 110995600t0 TCP machine1:sip-machine1:48315 (ESTABLISHED) java23985 user 52u IPv6 111004050t0 TCP machine1:sip-machine2:54476 (ESTABLISHED) java24157 user 40u IPv6 110924130t0 TCP machine1:48315-machine1:sip (ESTABLISHED) Ubuntu seems to recognize 5060 as the standard port for sip; it's not actually running anything there besides Spark, it just does a s/5060/sip/g. Is there something to the fact that every time I comment out SPARK_LOCAL_IP in spark-env, it crashes immediately upon spark-submit due to the address already being in use? Or am I barking up the wrong tree on that one? Thanks again for all your help; I hope we can knock this one out. Shannon On 6/26/14, 9:13 AM, Akhil Das wrote: Do you have ip machine1 in your workers /etc/hosts also? If so try telneting from your machine2 to machine1 on port 5060. Also make sure nothing else is running on port 5060 other than Spark (*/lsof -i:5060/*) Thanks Best Regards On Thu, Jun 26, 2014 at 6:35 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: Still running
Spark RDD member of class loses it's value when the class being used as graph attribute
Hi, I have a scenario where I am having a class X with constructor parameter as (RDD,Double).When I am initializing the the class object with corresponding RDD and double value (of name say x1) and *putting it as a vertex attribute in graph* , I am losing my RDD value . The Double value remains intact . I tried accessing simultaneously the RDD from instance variable (x1) and i see it intact there but for some reason it's not available when i take graph vertex attribute and access the RDD. Please help me to understand which concept I am missing here ? And whats the correct way to do it. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-RDD-member-of-class-loses-it-s-value-when-the-class-being-used-as-graph-attribute-tp8420.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
problem when start spark streaming in cluster mode
Hi all, I can start a spark streaming app in Client mode on a Pseudo-standalone cluster on my local machine. However when I tried to start it in Cluster mode. It always get the following exception on the Driver. Exception in thread main akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]] Can someone help? Thanks, siyuan
Re: ElasticSearch enrich
Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote: Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch Spark go :) On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote: Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for
Re: numpy + pyspark
I too felt the same Nick but I don't have root privileges on the cluster, unfortunately. Are there any alternatives? On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com wrote: I've not tried this - but numpy is a tricky and complex package with many dependencies on Fortran/C libraries etc. I'd say by the time you figure out correctly deploying numpy in this manner, you may as well have just built it into your cluster bootstrap process, or PSSH install it on each node... On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha avishek.s...@gmail.com wrote: To clarify I tried it and it almost worked -- but I am getting some problems from the Random module in numpy. If anyone has successfully passed a numpy module (via the --py-files option) to spark-submit then please let me know. Thanks !! Avishek On 26 June 2014 17:45, Avishek Saha avishek.s...@gmail.com wrote: Hi all, Instead of installing numpy in each worker node, is it possible to ship numpy (via --py-files option maybe) while invoking the spark-submit? Thanks, Avishek
Re: numpy + pyspark
Would deploying virtualenv on each directory on the cluster be viable? The dependencies would get tricky but I think this is the sort of situation it's built for. On 6/27/14, 11:06 AM, Avishek Saha wrote: I too felt the same Nick but I don't have root privileges on the cluster, unfortunately. Are there any alternatives? On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com wrote: I've not tried this - but numpy is a tricky and complex package with many dependencies on Fortran/C libraries etc. I'd say by the time you figure out correctly deploying numpy in this manner, you may as well have just built it into your cluster bootstrap process, or PSSH install it on each node... On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote: To clarify I tried it and it almost worked -- but I am getting some problems from the Random module in numpy. If anyone has successfully passed a numpy module (via the --py-files option) to spark-submit then please let me know. Thanks !! Avishek On 26 June 2014 17:45, Avishek Saha avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote: Hi all, Instead of installing numpy in each worker node, is it possible to ship numpy (via --py-files option maybe) while invoking the spark-submit? Thanks, Avishek
Re: numpy + pyspark
I suppose along those lines, there's also Anaconda: https://store.continuum.io/cshop/anaconda/ On 6/27/14, 11:13 AM, Nick Pentreath wrote: Hadoopy uses http://www.pyinstaller.org/ to package things up into an executable that should be runnable without root privileges. It says it support numpy On Fri, Jun 27, 2014 at 5:08 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: Would deploying virtualenv on each directory on the cluster be viable? The dependencies would get tricky but I think this is the sort of situation it's built for. On 6/27/14, 11:06 AM, Avishek Saha wrote: I too felt the same Nick but I don't have root privileges on the cluster, unfortunately. Are there any alternatives? On 27 June 2014 08:04, Nick Pentreath nick.pentre...@gmail.com mailto:nick.pentre...@gmail.com wrote: I've not tried this - but numpy is a tricky and complex package with many dependencies on Fortran/C libraries etc. I'd say by the time you figure out correctly deploying numpy in this manner, you may as well have just built it into your cluster bootstrap process, or PSSH install it on each node... On Fri, Jun 27, 2014 at 4:58 PM, Avishek Saha avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote: To clarify I tried it and it almost worked -- but I am getting some problems from the Random module in numpy. If anyone has successfully passed a numpy module (via the --py-files option) to spark-submit then please let me know. Thanks !! Avishek On 26 June 2014 17:45, Avishek Saha avishek.s...@gmail.com mailto:avishek.s...@gmail.com wrote: Hi all, Instead of installing numpy in each worker node, is it possible to ship numpy (via --py-files option maybe) while invoking the spark-submit? Thanks, Avishek
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
I got an answer on SO on this question, basically confirming that the CQLSSTableWrite cannot be used in Spark (at least in the form shown in the code snippet). DataStax filed a bug on that and might get solved on a future version. As you have observed, a single writer can only be used in serial (ConcurrentModificationExceptions will happen if you do not), and creating multiple writers in the JVM fails due to static schema construction within the Cassandra code that the SSTableWriter uses. I'm not aware of any workaround other than to spawn multiple JVMs, each writing to a separate directory. We have filed a Cassandra JIRA ticket to address this issue. https://issues.apache.org/jira/browse/CASSANDRA-7463; - Tupshin Harper http://stackoverflow.com/users/881195/tupshin-harper S.O. question: http://stackoverflow.com/questions/24396902/using-cqlsstablewriter-concurrently/24455785#24455785 On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote: Hi Gerard, What is the version of Spark, Hadoop, Cassandra and Calliope are you using. We never built Calliope to Hadoop2 as we/or our clients don't use Hadoop in their deployments or use it only as the Infra component for Spark in which case H1/H2 doesn't make a difference for them. I know atleast of one case where the user had built Calliope against 2.0 and was using it happily. If you need assistance with it we are here to help. Feel free to reach out to me directly and we can work out a solution for you. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: Using CQLSSTableWriter to batch load data from Spark to Cassandra.
Hi Rohit, Thanks for your message. We are currently on Spark 0.9.1, Cassandra 2.0.6 and Calliope GA (Would love to try the pre-release version if you want beta testers :-) Our hadoop version is CDH4.4 and of course our spark assembly is compiled against it. We have got really interesting performance results from using Calliope and will probably try to compile it against Hadoop 2. Compared to the DataStax Java driver, out of the box, the Calliope lib gives us ~4.5x insert performance with a higher network and cpu usage (which is what we want in batch insert mode = fast) With additional code optimizations using the DataStax driver, we were able to reduce that gap to 2x but still Calliope was easier and faster to use. Will you be attending the Spark Summit? I'll be around. We'll be in touch in any case :-) -kr, Gerard. On Thu, Jun 26, 2014 at 11:03 AM, Rohit Rai ro...@tuplejump.com wrote: Hi Gerard, What is the version of Spark, Hadoop, Cassandra and Calliope are you using. We never built Calliope to Hadoop2 as we/or our clients don't use Hadoop in their deployments or use it only as the Infra component for Spark in which case H1/H2 doesn't make a difference for them. I know atleast of one case where the user had built Calliope against 2.0 and was using it happily. If you need assistance with it we are here to help. Feel free to reach out to me directly and we can work out a solution for you. Regards, Rohit *Founder CEO, **Tuplejump, Inc.* www.tuplejump.com *The Data Engineering Platform* On Thu, Jun 26, 2014 at 12:44 AM, Gerard Maas gerard.m...@gmail.com wrote: Thanks Nick. We used the CassandraOutputFormat through Calliope. The Calliope API makes the CassandraOutputFormat quite accessible and is cool to work with. It worked fine at prototype level, but we had Hadoop version conflicts when we put it in our Spark environment (Using our Spark assembly compiled with CDH4.4). The conflict seems to be at the Cassandra-all lib level, which is compiled against a different hadoop version (v1). We could not get round that issue. (Any pointers in that direction?) That's why I'm trying the direct CQLSSTableWriter way but it looks blocked as well. -kr, Gerard. On Wed, Jun 25, 2014 at 8:57 PM, Nick Pentreath nick.pentre...@gmail.com wrote: can you not use a Cassandra OutputFormat? Seems they have BulkOutputFormat. An example of using it with Hadoop is here: http://shareitexploreit.blogspot.com/2012/03/bulkloadto-cassandra-with-hadoop.html Using it with Spark will be similar to the examples: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala and https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala On Wed, Jun 25, 2014 at 8:44 PM, Gerard Maas gerard.m...@gmail.com wrote: Hi, (My excuses for the cross-post from SO) I'm trying to create Cassandra SSTables from the results of a batch computation in Spark. Ideally, each partition should create the SSTable for the data it holds in order to parallelize the process as much as possible (and probably even stream it to the Cassandra ring as well) After the initial hurdles with the CQLSSTableWriter (like requiring the yaml file), I'm confronted now with this issue: java.lang.RuntimeException: Attempting to load already loaded column family customer.rawts at org.apache.cassandra.config.Schema.load(Schema.java:347) at org.apache.cassandra.config.Schema.load(Schema.java:112) at org.apache.cassandra.io.sstable.CQLSSTableWriter$Builder.forTable(CQLSSTableWriter.java:336) I'm creating a writer on each parallel partition like this: def store(rdd:RDD[Message]) = { rdd.foreachPartition( msgIterator = { val writer = CQLSSTableWriter.builder() .inDirectory(/tmp/cass) .forTable(schema) .using(insertSttmt).build() msgIterator.foreach(msg = {...}) })} And if I'm reading the exception correctly, I can only create one writer per table in one JVM. Digging a bit further in the code, it looks like the Schema.load(...) singleton enforces that limitation. I guess writings to the writer will not be thread-safe and even if they were the contention that multiple threads will create by having all parallel tasks trying to dump few GB of data to disk at the same time will defeat the purpose of using the SSTables for bulk upload anyway. So, are there ways to use the CQLSSTableWriter concurrently? If not, what is the next best option to load batch data at high throughput in Cassandra? Will the upcoming Spark-Cassandra integration help with this? (ie. should I just sit back, relax and the problem will solve itself?) Thanks, Gerard.
Re: Integrate Spark Editor with Hue for source compiled installation of spark/spark-jobServer
So far Spark Job Server does not work with Spark 1.0: https://github.com/ooyala/spark-jobserver So this works only with Spark 0.9 currently: http://gethue.com/get-started-with-spark-deploy-spark-server-and-compute-pi-from-your-web-browser/ Romain Romain On Tue, Jun 24, 2014 at 9:04 AM, Sunita Arvind sunitarv...@gmail.com wrote: Hello Experts, I am attempting to integrate Spark Editor with Hue on CDH5.0.1. I have the spark installation build manually from the sources for spark1.0.0. I am able to integrate this with cloudera manager. Background: --- We have a 3 node VM cluster with CDH5.0.1 We requried spark1.0.0 due to some features in it, so I did a yum remove spark-core spark-master spark-worker spark-python of the default spark0.9.0 and compiled spark1.0.0 from source: Downloaded the spark-trunk from git clone https://github.com/apache/spark.git cd spark SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true ./sbt/sbt assembly The spark-assembly-1.0.0-SNAPSHOT-hadoop2.2.0.jar was built and spark by itself seems to work well. I was even able to run a text file count. Current attempt: Referring to this article - http://gethue.com/a-new-spark-web-ui-spark-app/ Now I am trying to add the Spark editor to Hue. AFAIK, this requires git clone https://github.com/ooyala/spark-jobserver.git cd spark-jobserver sbt re-start This was successful after lot of struggle with the proxy settings. However, is this the job Server itself? Will that mean the job Server has to be manually started. I intend to have the spark editor show up in hue web UI and I am no way close. Can some one please help? Note, the 3 VMs are Linux CentOS. Not sure if setting something like can be expected to work.: [desktop] app_blacklist= Also, I have made the changes to vim . /job-server/src/main/resources/application.conf as recommended, however, I do not expect this to impact hue in any way. Also, I intend to let the editor stay available, not spawn it everytime it is required. Thanks in advance. regards
Re: Map with filter on JavaRdd
If for some reason it would be easier to do your mapping and filtering in a single function, you can also use RDD.flatMap (returning an empty sequence is equivalent to a filter). But unless you have good reason you should have a separate map and filter transform, as Mayur said. On Fri, Jun 27, 2014 at 7:43 AM, ajay garg ajay.g...@mobileum.com wrote: Thanks Mayur for clarification.. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Map-with-filter-on-JavaRdd-tp8401p8410.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
problem when start spark streaming in cluster mode
Hi all, I can start a spark streaming app in Client mode on a Pseudo-standalone cluster on my local machine. However when I tried to start it in Cluster mode. It always got the following exception on the Driver. Exception in thread main akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]] Can someone help? Thanks, siyuan
scopt.OptionParser
Hi, I tried to develop some code to use Logistic Regression, following the code in BinaryClassification.scala in examples/mllib. My code compiles, but at runtime complains that scopt/OptionParser class cannot be found. I have the following import statement in my code: import scopt.OptionParser My sbt file contains the following dependencies: scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.0 libraryDependencies += org.apache.spark %% spark-mllib % 1.0.0 libraryDependencies += com.github.scopt %% scopt % 3.2.0 resolvers += Akka Repository at http://repo.akka.io/releases/; Is there anything else I need to do to include the OptionParser? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/scopt-OptionParser-tp8436.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
Hello Mayur, Are you using SparkListener interface java API? I tried using it but was unsuccessful. So need few more inputs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8438.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark standalone network configuration problems
For some reason, commenting out spark.driver.host and spark.driver.port fixed something...and broke something else (or at least revealed another problem). For reference, the only lines I have in my spark-defaults.conf now: spark.app.name myProg spark.masterspark://192.168.1.101:5060 spark.executor.memory 8g spark.files.overwrite true It starts up, but has problems with machine2. For some reason, machine2 is having trouble communicating with *itself*. Here are the worker logs of one of the failures (there are 10 before it quits): Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms8192M -Xmx8192M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7 machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker app-20140627144512-0001 14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/06/27 14:56:54 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] 14/06/27 14:56:54 INFO Worker: Asked to launch executor app-20140627144512-0001/8 for Funtown, USA 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] Port 48019 on machine2 is indeed open, connected, and listening. Any ideas? Thanks! Shannon On 6/27/14, 1:54 AM, sujeetv wrote: Try to explicitly set set the spark.driver.host property to the master's IP. Sujeet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark standalone network configuration problems
Looks like your driver is not able to connect to the remote executor on machine2/130.49.226.148:60949. Cn you check if the master machine can route to 130.49.226.148 Sujeet On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu wrote: For some reason, commenting out spark.driver.host and spark.driver.port fixed something...and broke something else (or at least revealed another problem). For reference, the only lines I have in my spark-defaults.conf now: spark.app.name myProg spark.masterspark://192.168.1.101:5060 spark.executor.memory 8g spark.files.overwrite true It starts up, but has problems with machine2. For some reason, machine2 is having trouble communicating with *itself*. Here are the worker logs of one of the failures (there are 10 before it quits): Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/ spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2. 2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus- rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/ datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin- hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms8192M -Xmx8192M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7 machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker app-20140627144512-0001 14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/06/27 14:56:54 INFO LocalActorRef: Message [akka.remote.transport. ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/ system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F% 2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] 14/06/27 14:56:54 INFO Worker: Asked to launch executor app-20140627144512-0001/8 for Funtown, USA 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 ] Port 48019 on machine2 is indeed open, connected, and listening. Any ideas? Thanks! Shannon On 6/27/14, 1:54 AM, sujeetv wrote: Try to explicitly set set the spark.driver.host property to the master's IP. Sujeet -- View this message in context: http://apache-spark-user-list. 1001560.n3.nabble.com/Spark-standalone-network-configuration-problems- tp8304p8396.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark standalone network configuration problems
Apologies; can you advise as to how I would check that? I can certainly SSH from master to machine2. On 6/27/14, 3:22 PM, Sujeet Varakhedi wrote: Looks like your driver is not able to connect to the remote executor on machine2/130.49.226.148:60949 http://130.49.226.148:60949/. Cn you check if the master machine can route to 130.49.226.148 Sujeet On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: For some reason, commenting out spark.driver.host and spark.driver.port fixed something...and broke something else (or at least revealed another problem). For reference, the only lines I have in my spark-defaults.conf now: spark.app.name http://spark.app.name myProg spark.masterspark://192.168.1.101:5060 http://192.168.1.101:5060 spark.executor.memory 8g spark.files.overwrite true It starts up, but has problems with machine2. For some reason, machine2 is having trouble communicating with *itself*. Here are the worker logs of one of the failures (there are 10 before it quits): Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms8192M -Xmx8192M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7 machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker app-20140627144512-0001 14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/06/27 14:56:54 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] 14/06/27 14:56:54 INFO Worker: Asked to launch executor app-20140627144512-0001/8 for Funtown, USA 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] Port 48019 on machine2 is indeed open, connected, and listening. Any ideas? Thanks! Shannon On 6/27/14, 1:54 AM, sujeetv wrote: Try to explicitly set set the spark.driver.host property to the master's IP. Sujeet -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-network-configuration-problems-tp8304p8396.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: problem when start spark streaming in cluster mode
Hi Siyuan, Can you try this solution? http://stackoverflow.com/questions/21943353/akka-2-3-0-fails-to-load-slf4jeventhandler-class-with-java-lang-classnotfounde Best Date: Fri, 27 Jun 2014 14:18:59 -0400 Subject: problem when start spark streaming in cluster mode From: hsy...@gmail.com To: user@spark.apache.org Hi all, I can start a spark streaming app in Client mode on a Pseudo-standalone cluster on my local machine. However when I tried to start it in Cluster mode. It always got the following exception on the Driver. Exception in thread main akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]] Can someone help? Thanks, siyuan
jackson-core-asl jar (1.8.8 vs 1.9.x) conflict with the spark-sql (version 1.x)
Hi: I am using spark to stream data to cassandra and it works fine in local mode. But when I execute the application in a standalone clustered env I got exception included below (java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass). I think this is due to the jackson-core-asl dependency conflict (jackson-core-asl 1.8.8 has the JsonClass but 1.9.x does not). The 1.9.x version is being pulled in by spark-sql project. I tried adding jackson-core-asl 1.8.8 with --jars argument while submitting the application for execution but it did not work. So I created a custom spark build excluding sql project. With this custom spark install I was able to resolve the issue at least on a single node cluster (separate master and worker). If there is an alternate way to resolve this conflicting jar issue without a custom build (eg: configuration to use the user defined jars in the executor class path first), please let me know. Also, is there a comprehensive list of configuration properties available for spark ? Thanks Mans Exception trace TaskSetManager: Loss was due to java.lang.NoClassDefFoundError java.lang.NoClassDefFoundError: org/codehaus/jackson/annotate/JsonClass at org.codehaus.jackson.map.introspect.JacksonAnnotationIntrospector.findDeserializationType(JacksonAnnotationIntrospector.java:524) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.modifyTypeByAnnotation(BasicDeserializerFactory.java:732) at org.codehaus.jackson.map.deser.BasicDeserializerFactory.createCollectionDeserializer(BasicDeserializerFactory.java:229) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createDeserializer(StdDeserializerProvider.java:386) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCache2(StdDeserializerProvider.java:307) at org.codehaus.jackson.map.deser.StdDeserializerProvider._createAndCacheValueDeserializer(StdDeserializerProvider.java:287) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findValueDeserializer(StdDeserializerProvider.java:136) at org.codehaus.jackson.map.deser.StdDeserializerProvider.findTypedValueDeserializer(StdDeserializerProvider.java:157) at org.codehaus.jackson.map.ObjectMapper._findRootDeserializer(ObjectMapper.java:2468) at org.codehaus.jackson.map.ObjectMapper._readMapAndClose(ObjectMapper.java:2402) at org.codehaus.jackson.map.ObjectMapper.readValue(ObjectMapper.java:1602)
Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
I give up, communication must be blocked by the complex EC2 network topology (though the error information indeed need some improvement). It doesn't make sense to run a client thousands miles away to communicate frequently with workers. I have moved everything to EC2 now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Improving Spark multithreaded performance?
Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
Try to use --executor-memory 12g with spark-summit. Or you can set it in conf/spark-defaults.properties and rsync it to all workers and then restart. -Xiangrui On Fri, Jun 27, 2014 at 1:05 PM, Peng Cheng pc...@uow.edu.au wrote: I give up, communication must be blocked by the complex EC2 network topology (though the error information indeed need some improvement). It doesn't make sense to run a client thousands miles away to communicate frequently with workers. I have moved everything to EC2 now. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/TaskSchedulerImpl-Initial-job-has-not-accepted-any-resources-check-your-cluster-UI-to-ensure-that-woy-tp8247p8444.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Integrate spark-shell into officially supported web ui/api plug-in? What do you think?
This will be handy for demo and quick prototyping as the command-line REPL doesn't support a lot of editor features, also, you don't need to ssh into your worker/master if your client is behind an NAT wall. Since Spark codebase has a minimalistic design philosophy I don't think this component can make into the main repository. However it can be an independent project that is also supported by the community (like Solr/ElasticSearch to Lucene) I've reviewed and tested a few REPL web ui including: - Scala-notebook: https://github.com/Bridgewater/scala-notebook - Tinsmiths: https://github.com/kouphax/tinsmith - IScala: https://github.com/mattpap/IScala - Codebrew: https://codebrew.io/ however they are either too heavyweight, or their ILoop is buried very deep (sometimes even in another library). I'm interested in working on this part, has anyone experimented on similar solution before? Yours Peng -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: problem when start spark streaming in cluster mode
Hey Haoming, Actually akka.loggers has already been set to akka.event.slf4j.Slf4jLogger. You can check https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala Regards, SY On Fri, Jun 27, 2014 at 3:55 PM, Haoming Zhang haoming.zh...@outlook.com wrote: Hi Siyuan, Can you try this solution? http://stackoverflow.com/questions/21943353/akka-2-3-0-fails-to-load-slf4jeventhandler-class-with-java-lang-classnotfounde Best -- Date: Fri, 27 Jun 2014 14:18:59 -0400 Subject: problem when start spark streaming in cluster mode From: hsy...@gmail.com To: user@spark.apache.org Hi all, I can start a spark streaming app in Client mode on a Pseudo-standalone cluster on my local machine. However when I tried to start it in Cluster mode. It always got the following exception on the Driver. Exception in thread main akka.ConfigurationException: Could not start logger due to [akka.ConfigurationException: Logger specified in config can't be loaded [akka.event.slf4j.Slf4jLogger] due to [akka.event.Logging$LoggerInitializationException: Logger log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]]] Can someone help? Thanks, siyuan
Could not compute split, block not found
Hi, I am running a spark streaming job with 1 minute as the batch size. It ran around 84 minutes and was killed because of the exception with the following information: *java.lang.Exception: Could not compute split, block input-0-1403893740400 not found* Before it was killed, it was able to correctly generate output for each batch. Any help on this will be greatly appreciated. Bill
Re: ElasticSearch enrich
Ok I found dynamic resources, but I have a frustrating problem. This is the flow: kafka - enrich X - enrich Y - enrich Z - foreachRDD - save My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this: kafka - enrich X - enrich Y - print - enrich Z - foreachRDD The enrich X and enrich Y called but enrich Z not if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called) Any idea? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote: Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote: Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch Spark go :) On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote: Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node
Re: ElasticSearch enrich
So a few quick questions: 1) What cluster are you running this against? Is it just local? Have you tried local[4]? 2) When you say breakpoint, how are you setting this break point? There is a good chance your breakpoint mechanism doesn't work in a distributed environment, could you instead cause a side effect (like writing to a file)? Cheers, Holden :) On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote: Ok I found dynamic resources, but I have a frustrating problem. This is the flow: kafka - enrich X - enrich Y - enrich Z - foreachRDD - save My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this: kafka - enrich X - enrich Y - print - enrich Z - foreachRDD The enrich X and enrich Y called but enrich Z not if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called) Any idea? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote: Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote: Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch Spark go :) On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote: Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different
RE: ElasticSearch enrich
b0c1http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=user_nodesuser=1215, could you post your code? I am interested in your solution. Thanks Adrian From: boci [mailto:boci.b...@gmail.com] Sent: June-26-14 6:17 PM To: user@spark.apache.org Subject: Re: ElasticSearch enrich Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.commailto:nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat (https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014 at 4:16 PM, boci boci.b...@gmail.commailto:boci.b...@gmail.com wrote: Hi guys, thanks the direction now I have some problem/question: - in local (test) mode I want to use ElasticClient.local to create es connection, but in prodution I want to use ElasticClient.remote, to this I want to pass ElasticClient to mapPartitions, or what is the best practices? In this case you probably want to make the ElasticClient inside of mapPartitions (since it isn't serializable) and if you want to use a different client in local mode just have a flag that control what type of client you create. - my stream output is write into elasticsearch. How can I test output.saveAsHadoopFile[ESOutputFormat](-) in local environment? - After store the enriched data into ES, I want to generate aggregated data (EsInputFormat) how can I test it in local? I think the simplest thing to do would be use the same client in mode and just start single node elastic search cluster. Thanks guys b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.commailto:boci.b...@gmail.com On Wed, Jun 25, 2014 at 1:33 AM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: So I'm giving a talk at the Spark summit on using Spark ElasticSearch, but for now if you want to see a simple demo which uses elasticsearch for geo input you can take a look at my quick dirty implementation with TopTweetsInALocation ( https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/TopTweetsInALocation.scala ). This approach uses the ESInputFormat which avoids the difficulty of having to manually create ElasticSearch clients. This approach might not work for your data, e.g. if you need to create a query for each record in your RDD. If this is the case, you could instead look at using mapPartitions and setting up your Elasticsearch connection inside of that, so you could then re-use
Re: ElasticSearch enrich
This is a simply scalatest. I start a SparkConf, set the master to local (set the serializer etc), pull up kafka and es connection send a message to kafka and wait 30sec to processing. It's run in IDEA no magick trick. b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca wrote: So a few quick questions: 1) What cluster are you running this against? Is it just local? Have you tried local[4]? 2) When you say breakpoint, how are you setting this break point? There is a good chance your breakpoint mechanism doesn't work in a distributed environment, could you instead cause a side effect (like writing to a file)? Cheers, Holden :) On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote: Ok I found dynamic resources, but I have a frustrating problem. This is the flow: kafka - enrich X - enrich Y - enrich Z - foreachRDD - save My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this: kafka - enrich X - enrich Y - print - enrich Z - foreachRDD The enrich X and enrich Y called but enrich Z not if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called) Any idea? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote: Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote: Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch Spark go :) On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote: Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 1:20 AM, Holden Karau hol...@pigscanfly.ca wrote: On Wed, Jun 25, 2014
Re: Spark standalone network configuration problems
I switched which machine was the master and which was the dedicated worker, and now it works just fine. I discovered machine2 is on my department's DMZ; machine1 is not. I suspect the departmental firewall was causing problems. By moving the master to machine2, that seems to have solved my problems. Thank you all very much for your help. I'm sure I'll have other questions soon :) Regards, Shannon On 6/27/14, 3:22 PM, Sujeet Varakhedi wrote: Looks like your driver is not able to connect to the remote executor on machine2/130.49.226.148:60949 http://130.49.226.148:60949/. Cn you check if the master machine can route to 130.49.226.148 Sujeet On Fri, Jun 27, 2014 at 12:04 PM, Shannon Quinn squ...@gatech.edu mailto:squ...@gatech.edu wrote: For some reason, commenting out spark.driver.host and spark.driver.port fixed something...and broke something else (or at least revealed another problem). For reference, the only lines I have in my spark-defaults.conf now: spark.app.name http://spark.app.name myProg spark.masterspark://192.168.1.101:5060 http://192.168.1.101:5060 spark.executor.memory 8g spark.files.overwrite true It starts up, but has problems with machine2. For some reason, machine2 is having trouble communicating with *itself*. Here are the worker logs of one of the failures (there are 10 before it quits): Spark assembly has been built with Hive, including Datanucleus jars on classpath 14/06/27 14:55:13 INFO ExecutorRunner: Launch command: java -cp ::/home/spark/spark-1.0.0-bin-hadoop2/conf:/home/spark/spark-1.0.0-bin-hadoop2/lib/spark-assembly-1.0.0-hadoop2.2.0.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/home/spark/spark-1.0.0-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar -XX:MaxPermSize=128m -Xms8192M -Xmx8192M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@machine1:46378/user/CoarseGrainedScheduler 7 machine2 8 akka.tcp://sparkWorker@machine2:48019/user/Worker app-20140627144512-0001 14/06/27 14:56:54 INFO Worker: Executor app-20140627144512-0001/7 finished with state FAILED message Command exited with code 1 exitStatus 1 14/06/27 14:56:54 INFO LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%40130.49.226.148%3A53561-38#-1924573003] was not delivered. [10] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] 14/06/27 14:56:54 INFO Worker: Asked to launch executor app-20140627144512-0001/8 for Funtown, USA 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] 14/06/27 14:56:54 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@machine2:48019] - [akka.tcp://sparkExecutor@machine2:60949]: Error [Association failed with [akka.tcp://sparkExecutor@machine2:60949]] [ akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@machine2:60949] Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: machine2/130.49.226.148:60949 http://130.49.226.148:60949 ] Port 48019 on machine2 is indeed open, connected, and listening. Any ideas? Thanks! Shannon On 6/27/14, 1:54 AM, sujeetv wrote: Try to explicitly set set the spark.driver.host property to the master's IP. Sujeet -- View this message in context:
Re: ElasticSearch enrich
Try setting the master to local[4] On Fri, Jun 27, 2014 at 2:17 PM, boci boci.b...@gmail.com wrote: This is a simply scalatest. I start a SparkConf, set the master to local (set the serializer etc), pull up kafka and es connection send a message to kafka and wait 30sec to processing. It's run in IDEA no magick trick. b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 11:11 PM, Holden Karau hol...@pigscanfly.ca wrote: So a few quick questions: 1) What cluster are you running this against? Is it just local? Have you tried local[4]? 2) When you say breakpoint, how are you setting this break point? There is a good chance your breakpoint mechanism doesn't work in a distributed environment, could you instead cause a side effect (like writing to a file)? Cheers, Holden :) On Fri, Jun 27, 2014 at 2:04 PM, boci boci.b...@gmail.com wrote: Ok I found dynamic resources, but I have a frustrating problem. This is the flow: kafka - enrich X - enrich Y - enrich Z - foreachRDD - save My problem is: if I do this it's not work, the enrich functions not called, but if I put a print it's does. for example if I do this: kafka - enrich X - enrich Y - print - enrich Z - foreachRDD The enrich X and enrich Y called but enrich Z not if I put the print after the enrich Z it's will be printed. How can I solve this? (what can I do to call the foreachRDD I put breakpoint inside the map function (where I'm generate the writable) but it's not called) Any idea? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 4:53 PM, boci boci.b...@gmail.com wrote: Another question. In the foreachRDD I will initialize the JobConf, but in this place how can I get information from the items? I have an identifier in the data which identify the required ES index (so how can I set dynamic index in the foreachRDD) ? b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Fri, Jun 27, 2014 at 12:30 AM, Holden Karau hol...@pigscanfly.ca wrote: Just your luck I happened to be working on that very talk today :) Let me know how your experiences with Elasticsearch Spark go :) On Thu, Jun 26, 2014 at 3:17 PM, boci boci.b...@gmail.com wrote: Wow, thanks your fast answer, it's help a lot... b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 11:48 PM, Holden Karau hol...@pigscanfly.ca wrote: Hi b0c1, I have an example of how to do this in the repo for my talk as well, the specific example is at https://github.com/holdenk/elasticsearchspark/blob/master/src/main/scala/com/holdenkarau/esspark/IndexTweetsLive.scala . Since DStream doesn't have a saveAsHadoopDataset we use foreachRDD and then call saveAsHadoopDataset on the RDD that gets passed into the function we provide to foreachRDD. e.g. stream.foreachRDD{(data, time) = val jobconf = ... data.map(prepareDataForIndexing).saveAsHadoopDataset(jobConf) } Hope that helps :) Cheers, Holden :) On Thu, Jun 26, 2014 at 2:23 PM, boci boci.b...@gmail.com wrote: Thanks. I without local option I can connect with es remote, now I only have one problem. How can I use elasticsearch-hadoop with spark streaming? I mean DStream doesn't have saveAsHadoopFiles method, my second problem the output index is depend by the input data. Thanks -- Skype: boci13, Hangout: boci.b...@gmail.com On Thu, Jun 26, 2014 at 10:10 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can just add elasticsearch-hadoop as a dependency to your project to user the ESInputFormat and ESOutputFormat ( https://github.com/elasticsearch/elasticsearch-hadoop). Some other basics here: http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html For testing, yes I think you will need to start ES in local mode (just ./bin/elasticsearch) and use the default config (host = localhost, port = 9200). On Thu, Jun 26, 2014 at 9:04 AM, boci boci.b...@gmail.com wrote: That's okay, but hadoop has ES integration. what happened if I run saveAsHadoopFile without hadoop (or I must need to pull up hadoop programatically? (if I can)) b0c1 -- Skype: boci13, Hangout:
Re: Improving Spark multithreaded performance?
1) I'm using the static SVMWithSGD.train, with no options. 2) I have about 20,000 features (~5000 samples) that are being attached and trained against 14,000 different sets of labels (ie I'll be doing 14,000 different training runs against the same sets of features trying to figure out which labels can be learned), and I would also like to do cross fold validation. The driver doesn't seem to be using too much memory. I left it as -Xmx8g and it never complained. Kyle On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote: Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Spark vs Google cloud dataflow
Dean: Some interesting information... Do you know where I can read more about these coming changes to Scalding/Cascading? On Jun 27, 2014, at 9:40 AM, Dean Wampler deanwamp...@gmail.com wrote: ... and to be clear on the point, Summingbird is not limited to MapReduce. It abstracts over Scalding (which abstracts over Cascading, which is being moved from MR to Spark) and over Storm for event processing. On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote: On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote: Summingbird is for map/reduce. Dataflow is the third generation of google's map/reduce, and it generalizes map/reduce the way Spark does. See more about this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s Yes, my point was that Summingbird is similar in that it is a higher-level service for batch/streaming computation, not that it is similar for being MapReduce-based. It seems Dataflow is based on this paper: http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is more than that but yeah that seems to be some of the 'language'. It is similar in that it is a distributed collection abstraction. -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Re: Spark vs Google cloud dataflow
Sorry. Never mind... I guess that's what Summingbird is all about. Never heard of it. On Jun 27, 2014, at 7:10 PM, Marco Shaw marco.s...@gmail.com wrote: Dean: Some interesting information... Do you know where I can read more about these coming changes to Scalding/Cascading? On Jun 27, 2014, at 9:40 AM, Dean Wampler deanwamp...@gmail.com wrote: ... and to be clear on the point, Summingbird is not limited to MapReduce. It abstracts over Scalding (which abstracts over Cascading, which is being moved from MR to Spark) and over Storm for event processing. On Fri, Jun 27, 2014 at 7:16 AM, Sean Owen so...@cloudera.com wrote: On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote: Summingbird is for map/reduce. Dataflow is the third generation of google's map/reduce, and it generalizes map/reduce the way Spark does. See more about this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s Yes, my point was that Summingbird is similar in that it is a higher-level service for batch/streaming computation, not that it is similar for being MapReduce-based. It seems Dataflow is based on this paper: http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is more than that but yeah that seems to be some of the 'language'. It is similar in that it is a distributed collection abstraction. -- Dean Wampler, Ph.D. Typesafe @deanwampler http://typesafe.com http://polyglotprogramming.com
Re: Spark vs Google cloud dataflow
DataFlow is based on two papers, MillWheel for Stream processing and FlumeJava for programming optimization and abstraction. Millwheel http://research.google.com/pubs/pub41378.html FlumeJava http://dl.acm.org/citation.cfm?id=1806638 Here is my blog entry on this http://texploration.wordpress.com/2014/06/26/google-dataflow-service-to-fight-against-amazon-kinesis/ On Fri, Jun 27, 2014 at 5:16 AM, Sean Owen so...@cloudera.com wrote: On Thu, Jun 26, 2014 at 9:15 AM, Aureliano Buendia buendia...@gmail.com wrote: Summingbird is for map/reduce. Dataflow is the third generation of google's map/reduce, and it generalizes map/reduce the way Spark does. See more about this here: http://youtu.be/wtLJPvx7-ys?t=2h37m8s Yes, my point was that Summingbird is similar in that it is a higher-level service for batch/streaming computation, not that it is similar for being MapReduce-based. It seems Dataflow is based on this paper: http://pages.cs.wisc.edu/~akella/CS838/F12/838-CloudPapers/FlumeJava.pdf FlumeJava maps to Crunch in the Hadoop ecosystem. I think Dataflows is more than that but yeah that seems to be some of the 'language'. It is similar in that it is a distributed collection abstraction.
hadoop + yarn + spark
Hello, I have installed spark on top of hadoop + yarn. when I launch the pyspark shell try to compute something I get this error. Error from python worker: /usr/bin/python: No module named pyspark The pyspark module should be there, do I have to put an external link to it? --Sanghamitra. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-yarn-spark-tp8466.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Anybody changed their mind about going to the Spark Summit 2014
Hi All: I was wondering if anybody had bought a ticket for the upcoming Spark Summit 2014 this coming week and had changed their mind about going. Let me know, since it has sold out and I can't buy a ticket anymore, I would be interested in buying it. Best, -- Cesar Arevalo Software Engineer ❘ Zephyr Health 450 Mission Street, Suite #201 ❘ San Francisco, CA 94105 m: +1 415-571-7687 ❘ s: arevalocesar | t: @zephyrhealth https://twitter.com/zephyrhealth o: +1 415-529-7649 ❘ f: +1 415-520-9288 http://www.zephyrhealth.com
Re: Integrate spark-shell into officially supported web ui/api plug-in? What do you think?
That would be really cool with IPython, But I' still wondering if all language features are supported, namely I need these 2 in particular: 1. importing class and ILoop from external jars (so I can point it to SparkILoop or Sparkbinding ILoop of Apache Mahout instead of Scala's default ILoop) 2. implicit typecast/wrapper and implicit variable (widely used in SparkContext.scala) I'll be able to start experimentation immediately if someone can confirm these features. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Integrate-spark-shell-into-officially-supported-web-ui-api-plug-in-What-do-you-think-tp8447p8469.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Interconnect benchmarking
Hi, According with the research paper bellow of Mathei Zaharia, Spark's creator, http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf He says on page 10 that: Grep is network-bound due to the cost to replicate the input data to multiple nodes. So, I guess a can be a good initial recommendation. But I would like to know others workloads too. Best Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Interconnect benchmarking
A simple throughput test is also repartition()ing a large RDD. This also stresses the disks, though, so you might try to mount your spark temporary directory as a ramfs. On Fri, Jun 27, 2014 at 5:57 PM, danilopds danilob...@gmail.com wrote: Hi, According with the research paper bellow of Mathei Zaharia, Spark's creator, http://people.csail.mit.edu/matei/papers/2013/sosp_spark_streaming.pdf He says on page 10 that: Grep is network-bound due to the cost to replicate the input data to multiple nodes. So, I guess a can be a good initial recommendation. But I would like to know others workloads too. Best Regards. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Interconnect-benchmarking-tp8467p8470.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark job tracker.
I know this is a very trivial question to ask but I'm a complete new bee to this stuff so i don't have ne clue on this. Any help is much appreciated. For example if i have a class like below, and when i run this through command line i want to see progress status. some thing like, 10% completed... 30% completed... 100% completed...Job done! I am using spark 1.0 on yarn and using Java API. public class MyJavaWordCount { public static void main(String[] args) throws Exception { if (args.length 2) { System.err.println(Usage: MyJavaWordCount master file); System.exit(1); } System.out.println(args[0]: master=+args[0]); System.out.println(args[1]: file=+args[1]); JavaSparkContext ctx = new JavaSparkContext( args[0], MyJavaWordCount, System.getenv(SPARK_HOME), System.getenv(SPARK_EXAMPLES_JAR)); JavaRDDString lines = ctx.textFile(args[1], 1); // outputinput output JavaRDDString words = lines.flatMap(new FlatMapFunctionString, String() { // output input public IterableString call(String s) { return Arrays.asList(s.split( )); } }); // K V input K V JavaPairRDDString, Integer ones = words.mapToPair(new PairFunctionString, String, Integer() { //K V input public Tuple2String, Integer call(String s) { //K V return new Tuple2String, Integer(s, 1); } }); JavaPairRDDString, Integer counts = ones.reduceByKey(new Function2Integer, Integer, Integer() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); ListTuple2lt;String, Integer output = counts.collect(); for (Tuple2 tuple : output) { System.out.println(tuple._1 + : + tuple._2); } System.exit(0); } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-tracker-tp8367p8472.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
HBase 0.96+ with Spark 1.0+
The present trunk is built and tested against HBase 0.94. I have tried various combinations of versions of HBase 0.96+ and Spark 1.0+ and all end up with 14/06/27 20:11:15 INFO HttpServer: Starting HTTP Server [error] (run-main-0) java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package java.lang.SecurityException: class javax.servlet.FilterRegistration's signer information does not match signer information of other classes in the same package at java.lang.ClassLoader.checkCerts(ClassLoader.java:952) I have tried a number of different ways to exclude javax.servlet related jars. But none have avoided this error. Anyone have a (small-ish) build.sbt that works with later versions of HBase?
Re: hadoop + yarn + spark
Hi There, There is an issue with PySpark-on-YARN that requires users build with Java 6. The issue has to do with how Java 6 and 7 package jar files differently. Can you try building spark with Java 6 and trying again? - Patrick On Fri, Jun 27, 2014 at 5:00 PM, sdeb sangha...@gmail.com wrote: Hello, I have installed spark on top of hadoop + yarn. when I launch the pyspark shell try to compute something I get this error. Error from python worker: /usr/bin/python: No module named pyspark The pyspark module should be there, do I have to put an external link to it? --Sanghamitra. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/hadoop-yarn-spark-tp8466.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Distribute data from Kafka evenly on cluster
Hi, I have a number of questions using the Kafka receiver of Spark Streaming. Maybe someone has some more experience with that and can help me out. I have set up an environment for getting to know Spark, consisting of - a Mesos cluster with 3 only-slaves and 3 master-and-slaves, - 2 Kafka nodes, - 3 Zookeeper nodes providing service to both Kafka and Mesos. My Kafka cluster has only one topic with one partition (replicated to both nodes). When I start my Kafka receiver, it successfully connects to Kafka and does the processing, but it seems as if the (expensive) function in the final foreachRDD(...) is only executed on one node of my cluster, which is not what I had in mind when setting up the cluster ;-) So first, I was wondering about the parameter `topics: Map[String, Int]` to KafkaUtils.createStream(). Apparently it controls how many connections are made from my cluster nodes to Kafka. The Kafka doc at https://kafka.apache.org/documentation.html#introduction says each message published to a topic is delivered to one consumer instance within each subscribing consumer group and If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers. The Kafka docs *also* say: Note however that there cannot be more consumer instances than partitions. This seems to imply that with only one partition, increasing the number in my Map should have no effect. However, if I increase the number of streams for my one topic in my `topics` Map, I actually *do* see that the task in my foreachRDD(...) call is now executed on multiple nodes. Maybe it's more of a Kafka question than a Spark one, but can anyone explain this to me? Should I always have more Kafka partitions than Mesos cluster nodes? So, assuming that changing the number in that Map is not what I want (although I don't know if it is), I tried to use .repartition(numOfClusterNodes) (which doesn't seem right if I want to add and remove Mesos nodes on demand). This *also* did spread the foreachRDD(...) action evenly – however, the function never seems to terminate, so I never get to process the next interval in the stream. A similar behavior can be observed when running locally, not on the cluster, then the program will not exit but instead hang after everything else has shut down. Any hints concerning this issue? Thanks Tobias