Re: Web UI and standalone apps
Hi Aureliano, The Spark Application is defined by all things executed within a given Spark Context. This application's web server runs on port 4040 of the machine where the driver of the application is being executed. An example driver of a Spark Application is a single instance of the Spark Shell. This web ui, on port 4040, displays statistics about the Application such as the stages being executed, the number of tasks per stage and the progress of the tasks within a stage. Other Application statistics include the caching locations and percentages of RDDs being used within an Application (and across the stages of that Application) and the garbage collection times of the tasks that have been completed. The Spark Cluster is defined by all Applications executing on top of the resources provisioned to your particular deployment of Spark. These resources are managed by a Spark Master which contains the task scheduler and the cluster manager (unless you're using YARN or Mesos in which case they will provide the cluster manager). The UI on port 8080 is the UI of the Spark Master, and it is accessible on whichever node is currently executing the Spark Master. This UI displays cluster statistics such as the number of available worker nodes, the number of JVM executor processes per worker node, the number of running Applications utilizing this Cluster, et cetera. In short, shutting down a Spark Application will kill the UI on port 4040 because your application is terminated and therefore there are no running statistics to collect about that application. However, the UI on port 8080 continues to be up and report cluster-wide statistics until you kill the cluster by killing Spark Master. Hope that long-winded explanation made sense! Happy Holidays! On Fri, Dec 27, 2013 at 9:23 AM, Aureliano Buendia wrote: > Hi, > > > I'm a bit confused about web UI access of a stand alone spark app. > > - When running a spark app, a web server is launched at localhost:4040. > When the standalone app execution is finished, the web server is shut down. > What's the use of this web server? There is no way of reviewing the data > when the standalone app exists. > > - Creating SparkContext at spark://localhost:7077 creates another web UI. > Is this web UI supposed to be used with localhost:4040, or is it a > replacement? > > - Creating a context with spark://localhost:7077, and after running > ./bin/start-all.sh, I get this warning: > > WARN ClusterScheduler: Initial job has not accepted any resources; check > your cluster UI to ensure that workers are registered and have sufficient > memory >
Re: Worker failed to connect when build with SPARK_HADOOP_VERSION=2.2.0
That's a strange behavior. It has no problem connecting to the HDFS NameNode (v2.2.0) and reading and writing files, but this only works in spark shell (and in pyspark shell). The Spark workers not connecting to the Spark master shouldn't have anything to do with the version of Hadoop against which Spark is compiled... or am I completely missing something? On Mon, Dec 2, 2013 at 4:11 AM, Maxime Lemaire wrote: > Horia, > if you dont need yarn support you can get it work by setting SPARK_YARN to > false : > *SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=false sbt/sbt assembly* > > Raymond, > Ok, thank you, so thats why, im using the lastest release 0.8.0 (september > 25, 2013) > > > > > 2013/12/2 Liu, Raymond > > What version of code you are using? >> >> 2.2.0 support not yet merged into trunk. Check out >> https://github.com/apache/incubator-spark/pull/199 >> >> Best Regards, >> Raymond Liu >> >> From: horia@gmail.com [mailto:horia@gmail.com] On Behalf Of Horia >> Sent: Monday, December 02, 2013 3:00 PM >> To: user@spark.incubator.apache.org >> Subject: Re: Worker failed to connect when build with >> SPARK_HADOOP_VERSION=2.2.0 >> >> Has this been resolved? >> >> Forgive me if I missed the follow-up but I've been having the exact same >> problem. >> >> - Horia >> >> >> On Fri, Nov 22, 2013 at 5:38 AM, Maxime Lemaire >> wrote: >> Hi all, >> When im building Spark with Hadoop 2.2.0 support, workers cant connect to >> Spark master anymore. >> Network is up and hostnames are correct. Tcpdump can clearly see workers >> trying to connect (tcpdump outputs at the end). >> >> Same set up with Spark build without SPARK_HADOOP_VERSION (or >> with SPARK_HADOOP_VERSION=2.0.5-alpha) is working fine ! >> >> Some details : >> >> pmtx-master01 : master >> pmtx-master02 : slave >> >> (behavior is the same if i launch both master and slave from the same box) >> >> Building HADOOP 2.2.0 support : >> >> fresh install on pmtx-master01 : >> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly >> build successfull >> # >> >> fresh install on pmtx-master02 : >> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly >> ...build successfull >> # >> >> On pmtx-master01 : >> # ./bin/start-master.sh >> starting org.apache.spark.deploy.master.Master, logging to >> /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out >> # netstat -an | grep 7077 >> tcp6 0 0 10.90.XX.XX:7077:::* >> LISTEN >> # >> >> On pmtx-master02 : >> # nc -v pmtx-master01 7077 >> pmtx-master01 [10.90.XX.XX] 7077 (?) open >> # ./spark-class org.apache.spark.deploy.worker.Worker >> spark://pmtx-master01:7077 >> 13/11/22 10:57:50 INFO Slf4jEventHandler: Slf4jEventHandler started >> 13/11/22 10:57:50 INFO Worker: Starting Spark worker pmtx-master02:42271 >> with 8 cores, 22.6 GB RAM >> 13/11/22 10:57:50 INFO Worker: Spark home: /cluster/bin/spark >> 13/11/22 10:57:50 INFO WorkerWebUI: Started Worker web UI at >> http://pmtx-master02:8081 >> 13/11/22 10:57:50 INFO Worker: Connecting to master >> spark://pmtx-master01:7077 >> 13/11/22 10:57:50 ERROR Worker: Connection to master failed! Shutting >> down. >> # >> >> With spark-shell on pmtx-master02 : >> # MASTER=spark://pmtx-master01:7077 ./spark-shell >> Welcome to >> __ >> / __/__ ___ _/ /__ >> _\ \/ _ \/ _ `/ __/ '_/ >> /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 >> /_/ >> >> Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java >> 1.6.0_31) >> Initializing interpreter... >> Creating SparkContext... >> 13/11/22 11:19:29 INFO Slf4jEventHandler: Slf4jEventHandler started >> 13/11/22 11:19:29 INFO SparkEnv: Registering BlockManagerMaster >> 13/11/22 11:19:29 INFO MemoryStore: MemoryStore started with capacity >> 323.9 MB. >> 13/11/22 11:19:29 INFO DiskStore: Created local directory at >> /tmp/spark-local-20131122111929-3e3c >> 13/11/22 11:19:29 INFO ConnectionManager: Bound socket to port 42249 with >> id = ConnectionManagerId(pmtx-master02,42249) >> 13/11/22 11:19:29 INFO BlockManagerMaster: Trying to register BlockManager >> 13/11/22 11:19:29 INFO BlockManagerMaster: Registered BlockManager >> 13/11/22 11:19:29 INFO HttpBroadcast: Broadcast server started at >> http://10.90.66.67:52531 >> 13/11/22 11:19:29 INFO SparkEnv: Registering
Re: Worker failed to connect when build with SPARK_HADOOP_VERSION=2.2.0
Has this been resolved? Forgive me if I missed the follow-up but I've been having the exact same problem. - Horia On Fri, Nov 22, 2013 at 5:38 AM, Maxime Lemaire wrote: > Hi all, > When im building Spark with Hadoop 2.2.0 support, workers cant connect to > Spark master anymore. > Network is up and hostnames are correct. Tcpdump can clearly see workers > trying to connect (tcpdump outputs at the end). > > Same set up with Spark build without SPARK_HADOOP_VERSION (or with > SPARK_HADOOP_VERSION=2.0.5-alpha) > is working fine ! > > Some details : > > pmtx-master01 : master > pmtx-master02 : slave > > (behavior is the same if i launch both master and slave from the same box) > > Building HADOOP 2.2.0 support : > > fresh install on pmtx-master01 : > # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly > build successfull > # > > fresh install on pmtx-master02 : > # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly > ...build successfull > # > > On pmtx-master01 : > # ./bin/start-master.sh > starting org.apache.spark.deploy.master.Master, logging to > /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out > # netstat -an | grep 7077 > tcp6 0 0 10.90.XX.XX:7077:::*LISTEN > # > > On pmtx-master02 : > # nc -v pmtx-master01 7077 > pmtx-master01 [10.90.XX.XX] 7077 (?) open > # ./spark-class org.apache.spark.deploy.worker.Worker > spark://pmtx-master01:7077 > 13/11/22 10:57:50 INFO Slf4jEventHandler: Slf4jEventHandler started > 13/11/22 10:57:50 INFO Worker: Starting Spark worker pmtx-master02:42271 > with 8 cores, 22.6 GB RAM > 13/11/22 10:57:50 INFO Worker: Spark home: /cluster/bin/spark > 13/11/22 10:57:50 INFO WorkerWebUI: Started Worker web UI at > http://pmtx-master02:8081 > 13/11/22 10:57:50 INFO Worker: Connecting to master > spark://pmtx-master01:7077 > 13/11/22 10:57:50 ERROR Worker: Connection to master failed! Shutting down. > # > > With spark-shell on pmtx-master02 : > # MASTER=spark://pmtx-master01:7077 ./spark-shell > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 0.8.0 > /_/ > > Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java > 1.6.0_31) > Initializing interpreter... > Creating SparkContext... > 13/11/22 11:19:29 INFO Slf4jEventHandler: Slf4jEventHandler started > 13/11/22 11:19:29 INFO SparkEnv: Registering BlockManagerMaster > 13/11/22 11:19:29 INFO MemoryStore: MemoryStore started with capacity > 323.9 MB. > 13/11/22 11:19:29 INFO DiskStore: Created local directory at > /tmp/spark-local-20131122111929-3e3c > 13/11/22 11:19:29 INFO ConnectionManager: Bound socket to port 42249 with > id = ConnectionManagerId(pmtx-master02,42249) > 13/11/22 11:19:29 INFO BlockManagerMaster: Trying to register BlockManager > 13/11/22 11:19:29 INFO BlockManagerMaster: Registered BlockManager > 13/11/22 11:19:29 INFO HttpBroadcast: Broadcast server started at > http://10.90.66.67:52531 > 13/11/22 11:19:29 INFO SparkEnv: Registering MapOutputTracker > 13/11/22 11:19:29 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-40525f81-f883-45d5-92ad-bbff44ecf435 > 13/11/22 11:19:29 INFO SparkUI: Started Spark Web UI at > http://pmtx-master02:4040 > 13/11/22 11:19:29 INFO Client$ClientActor: Connecting to master > spark://pmtx-master01:7077 > 13/11/22 11:19:30 ERROR Client$ClientActor: Connection to master failed; > stopping client > 13/11/22 11:19:30 ERROR SparkDeploySchedulerBackend: Disconnected from > Spark cluster! > 13/11/22 11:19:30 ERROR ClusterScheduler: Exiting due to error from > cluster scheduler: Disconnected from Spark cluster > > snip > > WORKING : Building HADOOP 2.0.5-alpha support > > On pmtx-master01, now im building hadoop 2.0.5-alpha : > # sbt/sbt clean > ... > # SPARK_HADOOP_VERSION=2.0.5-alpha sbt/sbt assembly > ... > # ./bin/start-master.sh > starting org.apache.spark.deploy.master.Master, logging to > /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out > > Same build on pmtx-master02 : > # sbt/sbt clean > ... build successfull ... > # SPARK_HADOOP_VERSION=2.0.5-alpha sbt/sbt assembly > ... build successfull ... > # ./spark-class org.apache.spark.deploy.worker.Worker > spark://pmtx-master01:7077 > 13/11/22 11:25:34 INFO Slf4jEventHandler: Slf4jEventHandler started > 13/11/22 11:25:34 INFO Worker: Starting Spark worker pmtx-master02:33768 > with 8 cores, 22.6 GB RAM > 13/11/22 11:25:34 INFO Worker: Spark home: /cluster/bin
Re: Joining files
It seems to me that what you want is the following procedure - parse each file line by line - generate key, value pairs - join by key I think the following should accomplish what you're looking for val students = sc.textFile("./students.txt")// mapping over this RDD already maps over lines val courses = sc.textFile("./courses.txt")// mapping over this RDD already maps over lines val left = students.map( x => { columns = x.split(",") (columns(1), (columns(0), columns(2))) } ) val right = courses.map( x => { columns = x.split(",") (columns(0), columns(1)) } ) val joined = left.join(right) The major difference is selectively returning the fields which you actually want to join, rather than all the fields. A secondary difference is syntactic: you don't need a .map().map() since you can use a slightly more complex function block as illustrated. I think Spark is smart enough to optimize the .map().map() to basically what I've explicitly written... Horia On Mon, Nov 18, 2013 at 10:34 PM, Something Something < mailinglist...@gmail.com> wrote: > Was my question so dumb? Or, is this not a good use case for Spark? > > > On Sun, Nov 17, 2013 at 11:41 PM, Something Something < > mailinglist...@gmail.com> wrote: > >> I am a newbie to both Spark & Scala, but I've been working with >> Hadoop/Pig for quite some time. >> >> We've quite a few ETL processes running in production that use Pig, but >> now we're evaluating Spark to see if they would indeed run faster. >> >> A very common use case in our Pig script is joining a file containing >> Facts to a file containing Dimension data. The joins are of course, inner, >> left & outer. >> >> I thought I would start simple. Let's say I've 2 files: >> >> 1) Students: student_id, course_id, score >> 2) Course: course_id, course_title >> >> We want to produce a file that contains: student_id, course_title, score >> >> (Note: This is a hypothetical case. The real files have millions of >> facts & thousands of dimensions) >> >> Would something like this work? Note: I did say I am a newbie ;) >> >> val students = sc.textFile("./students.txt") >> val courses = sc.textFile("./courses.txt") >> val s = students.map(x => x.split(',')) >> val left = students.map(x => x.split(',')).map(y => (y(1), y)) >> val right = courses.map(x => x.split(',')).map(y => (y(0), y)) >> val joined = left.join(right) >> >> >> Any pointers in this regard would be greatly appreciated. Thanks. >> > >
Re: Receiving intermediary results of a Spark operation
I may be wrong here... It seems to me that the use of such functionality is contrary to the paradigm that Spark enforces. Here is why I say that:Spark doesn't execute transformations, such as 'map', until an action is requested, such as 'persist'. Therefore, explicitly performing computations between partially competed chunks of a 'map' call seems counter to the Spark MO. -- Horia On Nov 4, 2013 12:22 PM, "Markus Losoi" wrote: > Hi > > Is it possible for a driver program to receive intermediary results of a > Spark operation? If, e.g., a long map() operation is in progress, can the > driver become aware of some of the (key, value) pairs before all of them > are > computed? > > There seems to be SparkListener interface that has an onTaskEnd() event > [1]. > However, the documentation is somewhat sparse on what kind of information > is > included in a SparkListenerTaskEnd object [2]. > > [1] > > http://spark.incubator.apache.org/docs/0.8.0/api/core/org/apache/spark/sched > uler/SparkListener.html > [2] > > http://spark.incubator.apache.org/docs/0.8.0/api/core/org/apache/spark/sched > uler/SparkListenerTaskEnd.html > > Best regards, > Markus Losoi (markus.lo...@gmail.com) > >
Re: Wrong result with mapPartitions example
Silly question: does sc.parallelize guarantee the allocation of the items to always be distributed equally across the partitions? It seems to me that, in the example above, all four items were assigned to the same partition. Have you tried the same with many more items? On Sep 26, 2013 9:01 PM, "Shangyu Luo" wrote: > Hi, > I am trying to test mapPartitions function in Spark Python version, but I > got wrong result. > More specifically, in pyspark shell: > >>> rdd = sc.parallelize([1, 2, 3, 4], 2) > >>> def f(iterator): yield sum(iterator) > ... > >>> rdd.mapPartitions(f).collect() > The result is [0, 10], not [3, 7] > Is there anything wrong with my code? > Thanks! > > > -- > -- > > Shangyu, Luo > Department of Computer Science > Rice University > >
Re: RDD function question
Stepping away from any particular framework, it seems to me that you can never guarantee that you only read rows in that date range. Even with a sorted array, you need to do a log( N ) binary search to find each of your boundary dates. Unless you maintain explicit pointers to these boundaries, which turns out to be moot because a) your dates are changing dynamically so updating them and maintaining the sorted order requires a minimum of log( N ) operations anyways, and b) you are dealing with files not arrays - files require you to seek a particular line number one line at a time, in O( N ). So, back to your Spark specific question: you cannot do better than O( N ) anyways with a file so why worry about anything more sophisticated than a 'filter' transformation? On Sep 16, 2013 3:51 PM, "Satheessh" wrote: > 1. The date is dynamic. (I.e if the date is changed we shouldn't read all > records). > Look like below solution will read all the records if the date is changed. > (Please Correct me if I am wrong) > > 2. We can assume file is sorted by date. > > Sent from my iPhone > > On Sep 16, 2013, at 5:27 PM, Horia wrote: > > Without sorting, you can implement this using the 'filter' transformation. > > This will eventually read all the rows once, but subsequently only shuffle > and send the transformed data which passed the filter. > > Does this help, or did I misunderstand? > On Sep 16, 2013 1:37 PM, "satheessh chinnu" wrote: > >> i am having a text file. Each line is a record and first ten characters >> on each line is a date in -MM-DD format. >> >> i would like to run a map function on this RDD with specific date range. >> (i.e from 2005 -01-01 to 2007-12-31). I would like to avoid reading the >> records out of the specified data range. (i.e kind of primary index sorted >> by date) >> >> is there way to implement this? >> >> >>
Re: RDD function question
Without sorting, you can implement this using the 'filter' transformation. This will eventually read all the rows once, but subsequently only shuffle and send the transformed data which passed the filter. Does this help, or did I misunderstand? On Sep 16, 2013 1:37 PM, "satheessh chinnu" wrote: > i am having a text file. Each line is a record and first ten characters > on each line is a date in -MM-DD format. > > i would like to run a map function on this RDD with specific date range. > (i.e from 2005 -01-01 to 2007-12-31). I would like to avoid reading the > records out of the specified data range. (i.e kind of primary index sorted > by date) > > is there way to implement this? > > >
Master Master Replication
Hey all, Is there a tutorial on how to configure master master replication in Spark on a cluster, in native mode, not on top of Mesos? Thanks very much, Horia Margarit