Re: Web UI and standalone apps

2013-12-27 Thread Horia
Hi Aureliano,

The Spark Application is defined by all things executed within a given
Spark Context. This application's web server runs on port 4040 of the
machine where the driver of the application is being executed. An example
driver of a Spark Application is a single instance of the Spark Shell. This
web ui, on port 4040, displays statistics about the Application such as the
stages being executed, the number of tasks per stage and the progress of
the tasks within a stage. Other Application statistics include the caching
locations and percentages of RDDs being used within an Application (and
across the stages of that Application) and the garbage collection times of
the tasks that have been completed.

The Spark Cluster is defined by all Applications executing on top of the
resources provisioned to your particular deployment of Spark. These
resources are managed by a Spark Master which contains the task scheduler
and the cluster manager (unless you're using YARN or Mesos in which case
they will provide the cluster manager). The UI on port 8080 is the UI of
the Spark Master, and it is accessible on whichever node is currently
executing the Spark Master. This UI displays cluster statistics such as the
number of available worker nodes, the number of JVM executor processes per
worker node, the number of running Applications utilizing this Cluster, et
cetera.

In short, shutting down a Spark Application will kill the UI on port 4040
because your application is terminated and therefore there are no running
statistics to collect about that application. However, the UI on port 8080
continues to be up and report cluster-wide statistics until you kill the
cluster by killing Spark Master.

Hope that long-winded explanation made sense!
Happy Holidays!



On Fri, Dec 27, 2013 at 9:23 AM, Aureliano Buendia wrote:

> Hi,
>
>
> I'm a bit confused about web UI access of a stand alone spark app.
>
> - When running a spark app, a web server is launched at localhost:4040.
> When the standalone app execution is finished, the web server is shut down.
> What's the use of this web server? There is no way of reviewing the data
> when the standalone app exists.
>
> - Creating SparkContext at spark://localhost:7077 creates another web UI.
> Is this web UI supposed to be used with localhost:4040, or is it a
> replacement?
>
> - Creating a context with spark://localhost:7077, and after running
> ./bin/start-all.sh, I get this warning:
>
> WARN ClusterScheduler: Initial job has not accepted any resources; check
> your cluster UI to ensure that workers are registered and have sufficient
> memory
>


Re: Worker failed to connect when build with SPARK_HADOOP_VERSION=2.2.0

2013-12-03 Thread Horia
That's a strange behavior. It has no problem connecting to the HDFS
NameNode (v2.2.0) and reading and writing files, but this only works in
spark shell (and in pyspark shell).
The Spark workers not connecting to the Spark master shouldn't have
anything to do with the version of Hadoop against which Spark is
compiled... or am I completely missing something?



On Mon, Dec 2, 2013 at 4:11 AM, Maxime Lemaire wrote:

> Horia,
> if you dont need yarn support you can get it work by setting SPARK_YARN to
> false :
> *SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=false sbt/sbt assembly*
>
> Raymond,
> Ok, thank you, so thats why, im using the lastest release 0.8.0 (september
> 25, 2013)
>
>
>
>
> 2013/12/2 Liu, Raymond 
>
> What version of code you are using?
>>
>> 2.2.0 support not yet merged into trunk. Check out
>> https://github.com/apache/incubator-spark/pull/199
>>
>> Best Regards,
>> Raymond Liu
>>
>> From: horia@gmail.com [mailto:horia@gmail.com] On Behalf Of Horia
>> Sent: Monday, December 02, 2013 3:00 PM
>> To: user@spark.incubator.apache.org
>> Subject: Re: Worker failed to connect when build with
>> SPARK_HADOOP_VERSION=2.2.0
>>
>> Has this been resolved?
>>
>> Forgive me if I missed the follow-up but I've been having the exact same
>> problem.
>>
>> - Horia
>>
>>
>> On Fri, Nov 22, 2013 at 5:38 AM, Maxime Lemaire 
>> wrote:
>> Hi all,
>> When im building Spark with Hadoop 2.2.0 support, workers cant connect to
>> Spark master anymore.
>> Network is up and hostnames are correct. Tcpdump can clearly see workers
>> trying to connect (tcpdump outputs at the end).
>>
>> Same set up with Spark build without SPARK_HADOOP_VERSION (or
>> with SPARK_HADOOP_VERSION=2.0.5-alpha) is working fine !
>>
>> Some details :
>>
>> pmtx-master01 : master
>> pmtx-master02 : slave
>>
>> (behavior is the same if i launch both master and slave from the same box)
>>
>> Building HADOOP 2.2.0 support :
>>
>> fresh install on pmtx-master01 :
>> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly
>> build successfull
>> #
>>
>> fresh install on pmtx-master02 :
>> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly
>> ...build successfull
>> #
>>
>> On pmtx-master01 :
>> # ./bin/start-master.sh
>> starting org.apache.spark.deploy.master.Master, logging to
>> /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out
>> # netstat -an | grep 7077
>> tcp6   0  0 10.90.XX.XX:7077:::*
>>  LISTEN
>> #
>>
>> On pmtx-master02 :
>> # nc -v pmtx-master01 7077
>> pmtx-master01 [10.90.XX.XX] 7077 (?) open
>> # ./spark-class org.apache.spark.deploy.worker.Worker
>> spark://pmtx-master01:7077
>> 13/11/22 10:57:50 INFO Slf4jEventHandler: Slf4jEventHandler started
>> 13/11/22 10:57:50 INFO Worker: Starting Spark worker pmtx-master02:42271
>> with 8 cores, 22.6 GB RAM
>> 13/11/22 10:57:50 INFO Worker: Spark home: /cluster/bin/spark
>> 13/11/22 10:57:50 INFO WorkerWebUI: Started Worker web UI at
>> http://pmtx-master02:8081
>> 13/11/22 10:57:50 INFO Worker: Connecting to master
>> spark://pmtx-master01:7077
>> 13/11/22 10:57:50 ERROR Worker: Connection to master failed! Shutting
>> down.
>> #
>>
>> With spark-shell on pmtx-master02 :
>> # MASTER=spark://pmtx-master01:7077 ./spark-shell
>> Welcome to
>>     __
>>  / __/__  ___ _/ /__
>> _\ \/ _ \/ _ `/ __/  '_/
>>  /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
>>   /_/
>>
>> Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java
>> 1.6.0_31)
>> Initializing interpreter...
>> Creating SparkContext...
>> 13/11/22 11:19:29 INFO Slf4jEventHandler: Slf4jEventHandler started
>> 13/11/22 11:19:29 INFO SparkEnv: Registering BlockManagerMaster
>> 13/11/22 11:19:29 INFO MemoryStore: MemoryStore started with capacity
>> 323.9 MB.
>> 13/11/22 11:19:29 INFO DiskStore: Created local directory at
>> /tmp/spark-local-20131122111929-3e3c
>> 13/11/22 11:19:29 INFO ConnectionManager: Bound socket to port 42249 with
>> id = ConnectionManagerId(pmtx-master02,42249)
>> 13/11/22 11:19:29 INFO BlockManagerMaster: Trying to register BlockManager
>> 13/11/22 11:19:29 INFO BlockManagerMaster: Registered BlockManager
>> 13/11/22 11:19:29 INFO HttpBroadcast: Broadcast server started at
>> http://10.90.66.67:52531
>> 13/11/22 11:19:29 INFO SparkEnv: Registering

Re: Worker failed to connect when build with SPARK_HADOOP_VERSION=2.2.0

2013-12-01 Thread Horia
Has this been resolved?

Forgive me if I missed the follow-up but I've been having the exact same
problem.

- Horia



On Fri, Nov 22, 2013 at 5:38 AM, Maxime Lemaire wrote:

> Hi all,
> When im building Spark with Hadoop 2.2.0 support, workers cant connect to
> Spark master anymore.
> Network is up and hostnames are correct. Tcpdump can clearly see workers
> trying to connect (tcpdump outputs at the end).
>
> Same set up with Spark build without SPARK_HADOOP_VERSION (or with 
> SPARK_HADOOP_VERSION=2.0.5-alpha)
> is working fine !
>
> Some details :
>
> pmtx-master01 : master
> pmtx-master02 : slave
>
> (behavior is the same if i launch both master and slave from the same box)
>
> Building HADOOP 2.2.0 support :
>
> fresh install on pmtx-master01 :
> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly
> build successfull
> #
>
> fresh install on pmtx-master02 :
> # SPARK_HADOOP_VERSION=2.2.0 sbt/sbt assembly
> ...build successfull
> #
>
> On pmtx-master01 :
> # ./bin/start-master.sh
> starting org.apache.spark.deploy.master.Master, logging to
> /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out
> # netstat -an | grep 7077
> tcp6   0  0 10.90.XX.XX:7077:::*LISTEN
> #
>
> On pmtx-master02 :
> # nc -v pmtx-master01 7077
> pmtx-master01 [10.90.XX.XX] 7077 (?) open
> # ./spark-class org.apache.spark.deploy.worker.Worker
> spark://pmtx-master01:7077
> 13/11/22 10:57:50 INFO Slf4jEventHandler: Slf4jEventHandler started
> 13/11/22 10:57:50 INFO Worker: Starting Spark worker pmtx-master02:42271
> with 8 cores, 22.6 GB RAM
> 13/11/22 10:57:50 INFO Worker: Spark home: /cluster/bin/spark
> 13/11/22 10:57:50 INFO WorkerWebUI: Started Worker web UI at
> http://pmtx-master02:8081
> 13/11/22 10:57:50 INFO Worker: Connecting to master
> spark://pmtx-master01:7077
> 13/11/22 10:57:50 ERROR Worker: Connection to master failed! Shutting down.
> #
>
> With spark-shell on pmtx-master02 :
> # MASTER=spark://pmtx-master01:7077 ./spark-shell
> Welcome to
>     __
>  / __/__  ___ _/ /__
>  _\ \/ _ \/ _ `/ __/  '_/
>  /___/ .__/\_,_/_/ /_/\_\   version 0.8.0
>   /_/
>
> Using Scala version 2.9.3 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.6.0_31)
> Initializing interpreter...
> Creating SparkContext...
> 13/11/22 11:19:29 INFO Slf4jEventHandler: Slf4jEventHandler started
> 13/11/22 11:19:29 INFO SparkEnv: Registering BlockManagerMaster
> 13/11/22 11:19:29 INFO MemoryStore: MemoryStore started with capacity
> 323.9 MB.
> 13/11/22 11:19:29 INFO DiskStore: Created local directory at
> /tmp/spark-local-20131122111929-3e3c
> 13/11/22 11:19:29 INFO ConnectionManager: Bound socket to port 42249 with
> id = ConnectionManagerId(pmtx-master02,42249)
> 13/11/22 11:19:29 INFO BlockManagerMaster: Trying to register BlockManager
> 13/11/22 11:19:29 INFO BlockManagerMaster: Registered BlockManager
> 13/11/22 11:19:29 INFO HttpBroadcast: Broadcast server started at
> http://10.90.66.67:52531
> 13/11/22 11:19:29 INFO SparkEnv: Registering MapOutputTracker
> 13/11/22 11:19:29 INFO HttpFileServer: HTTP File server directory is
> /tmp/spark-40525f81-f883-45d5-92ad-bbff44ecf435
> 13/11/22 11:19:29 INFO SparkUI: Started Spark Web UI at
> http://pmtx-master02:4040
> 13/11/22 11:19:29 INFO Client$ClientActor: Connecting to master
> spark://pmtx-master01:7077
> 13/11/22 11:19:30 ERROR Client$ClientActor: Connection to master failed;
> stopping client
> 13/11/22 11:19:30 ERROR SparkDeploySchedulerBackend: Disconnected from
> Spark cluster!
> 13/11/22 11:19:30 ERROR ClusterScheduler: Exiting due to error from
> cluster scheduler: Disconnected from Spark cluster
>
>  snip 
>
> WORKING : Building HADOOP 2.0.5-alpha support
>
> On pmtx-master01, now im building hadoop 2.0.5-alpha :
> # sbt/sbt clean
> ...
> # SPARK_HADOOP_VERSION=2.0.5-alpha sbt/sbt assembly
> ...
> # ./bin/start-master.sh
> starting org.apache.spark.deploy.master.Master, logging to
> /cluster/bin/spark-0.8.0-incubating/bin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-pmtx-master01.out
>
> Same build on pmtx-master02 :
> # sbt/sbt clean
> ... build successfull ...
> # SPARK_HADOOP_VERSION=2.0.5-alpha sbt/sbt assembly
> ... build successfull ...
> # ./spark-class org.apache.spark.deploy.worker.Worker
> spark://pmtx-master01:7077
> 13/11/22 11:25:34 INFO Slf4jEventHandler: Slf4jEventHandler started
> 13/11/22 11:25:34 INFO Worker: Starting Spark worker pmtx-master02:33768
> with 8 cores, 22.6 GB RAM
> 13/11/22 11:25:34 INFO Worker: Spark home: /cluster/bin

Re: Joining files

2013-11-18 Thread Horia
It seems to me that what you want is the following procedure
- parse each file line by line
- generate key, value pairs
- join by key

I think the following should accomplish what you're looking for

val students = sc.textFile("./students.txt")// mapping over this RDD
already maps over lines
val courses = sc.textFile("./courses.txt")// mapping over this RDD
already maps over lines
val left = students.map( x => {
columns = x.split(",")
(columns(1), (columns(0), columns(2)))
} )
val right = courses.map( x => {
columns = x.split(",")
(columns(0), columns(1))
} )
val joined = left.join(right)


The major difference is selectively returning the fields which you actually
want to join, rather than all the fields. A secondary difference is
syntactic: you don't need a .map().map() since you can use a slightly more
complex function block as illustrated. I think Spark is smart enough to
optimize the .map().map() to basically what I've explicitly written...

Horia



On Mon, Nov 18, 2013 at 10:34 PM, Something Something <
mailinglist...@gmail.com> wrote:

> Was my question so dumb?  Or, is this not a good use case for Spark?
>
>
> On Sun, Nov 17, 2013 at 11:41 PM, Something Something <
> mailinglist...@gmail.com> wrote:
>
>> I am a newbie to both Spark & Scala, but I've been working with
>> Hadoop/Pig for quite some time.
>>
>> We've quite a few ETL processes running in production that use Pig, but
>> now we're evaluating Spark to see if they would indeed run faster.
>>
>> A very common use case in our Pig script is joining a file containing
>> Facts to a file containing Dimension data.  The joins are of course, inner,
>> left & outer.
>>
>> I thought I would start simple.  Let's say I've 2 files:
>>
>> 1)  Students:  student_id, course_id, score
>> 2)  Course:  course_id, course_title
>>
>> We want to produce a file that contains:  student_id, course_title, score
>>
>> (Note:  This is a hypothetical case.  The real files have millions of
>> facts & thousands of dimensions)
>>
>> Would something like this work?  Note:  I did say I am a newbie ;)
>>
>> val students = sc.textFile("./students.txt")
>> val courses = sc.textFile("./courses.txt")
>> val s = students.map(x => x.split(','))
>> val left = students.map(x => x.split(',')).map(y => (y(1), y))
>> val right = courses.map(x => x.split(',')).map(y => (y(0), y))
>> val joined = left.join(right)
>>
>>
>> Any pointers in this regard would be greatly appreciated.  Thanks.
>>
>
>


Re: Receiving intermediary results of a Spark operation

2013-11-04 Thread Horia
I may be wrong here... It seems to me that the use of such functionality is
contrary to the paradigm that Spark enforces.

Here is why I say that:Spark doesn't execute transformations, such as
'map', until an action is requested, such as 'persist'. Therefore,
explicitly performing computations between partially competed chunks of a
'map' call seems counter to the Spark MO.

-- Horia
 On Nov 4, 2013 12:22 PM, "Markus Losoi"  wrote:

> Hi
>
> Is it possible for a driver program to receive intermediary results of a
> Spark operation? If, e.g., a long map() operation is in progress, can the
> driver become aware of some of the (key, value) pairs before all of them
> are
> computed?
>
> There seems to be SparkListener interface that has an onTaskEnd() event
> [1].
> However, the documentation is somewhat sparse on what kind of information
> is
> included in a SparkListenerTaskEnd object [2].
>
> [1]
>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/org/apache/spark/sched
> uler/SparkListener.html
> [2]
>
> http://spark.incubator.apache.org/docs/0.8.0/api/core/org/apache/spark/sched
> uler/SparkListenerTaskEnd.html
>
> Best regards,
> Markus Losoi (markus.lo...@gmail.com)
>
>


Re: Wrong result with mapPartitions example

2013-09-26 Thread Horia
Silly question: does sc.parallelize guarantee the allocation of the items
to always be distributed equally across the partitions?

It seems to me that, in the example above, all four items were assigned to
the same partition. Have you tried the same with many more items?
On Sep 26, 2013 9:01 PM, "Shangyu Luo"  wrote:

> Hi,
> I am trying to test mapPartitions function in Spark Python version, but I
> got wrong result.
> More specifically, in pyspark shell:
> >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
> >>> def f(iterator): yield sum(iterator)
> ...
> >>> rdd.mapPartitions(f).collect()
> The result is [0, 10], not [3, 7]
> Is there anything wrong with my code?
> Thanks!
>
>
> --
> --
>
> Shangyu, Luo
> Department of Computer Science
> Rice University
>
>


Re: RDD function question

2013-09-16 Thread Horia
Stepping away from any particular framework, it seems to me that you can
never guarantee that you only read rows in that date range.

Even with a sorted array, you need to do a log( N ) binary search to find
each of your boundary dates. Unless you maintain explicit pointers to these
boundaries, which turns out to be moot because a) your dates are changing
dynamically so updating them and maintaining the sorted order requires a
minimum of log( N ) operations anyways, and b) you are dealing with files
not arrays - files require you to seek a particular line number one line at
a time, in O( N ).

So, back to your Spark specific question: you cannot do better than O( N )
anyways with a file so why worry about anything more sophisticated than a
'filter' transformation?
 On Sep 16, 2013 3:51 PM, "Satheessh"  wrote:

> 1. The date is dynamic. (I.e if the date is changed we shouldn't read all
> records).
> Look like below solution will read all the records if the date is changed.
> (Please Correct me if I am wrong)
>
> 2. We can assume file is sorted by date.
>
> Sent from my iPhone
>
> On Sep 16, 2013, at 5:27 PM, Horia  wrote:
>
> Without sorting, you can implement this using the 'filter' transformation.
>
> This will eventually read all the rows once, but subsequently only shuffle
> and send the transformed data which passed the filter.
>
> Does this help, or did I misunderstand?
> On Sep 16, 2013 1:37 PM, "satheessh chinnu"  wrote:
>
>> i am having a text file.  Each line is a record and first ten characters
>> on each line is a date in -MM-DD format.
>>
>> i would like to run a map function on this RDD with specific date range.
>> (i.e from 2005 -01-01 to 2007-12-31).  I would like to avoid reading the
>> records out of the specified data range. (i.e kind of primary index sorted
>> by date)
>>
>> is there way to implement this?
>>
>>
>>


Re: RDD function question

2013-09-16 Thread Horia
Without sorting, you can implement this using the 'filter' transformation.

This will eventually read all the rows once, but subsequently only shuffle
and send the transformed data which passed the filter.

Does this help, or did I misunderstand?
On Sep 16, 2013 1:37 PM, "satheessh chinnu"  wrote:

> i am having a text file.  Each line is a record and first ten characters
> on each line is a date in -MM-DD format.
>
> i would like to run a map function on this RDD with specific date range.
> (i.e from 2005 -01-01 to 2007-12-31).  I would like to avoid reading the
> records out of the specified data range. (i.e kind of primary index sorted
> by date)
>
> is there way to implement this?
>
>
>


Master Master Replication

2013-09-15 Thread Horia
Hey all,

Is there a tutorial on how to configure master master replication in Spark
on a cluster, in native mode, not on top of Mesos?

Thanks very much,

Horia Margarit