Re: too many small files and task

2014-12-19 Thread bethesda
I recently had the same problem.  I'm not an expert but will suggest that you
concatenate your files into a smaller number of larger files.  E.g. in Linux
cat files  a_larger_file.  This helped greatly.  

Likely others better qualified will weigh in on this later but that's
something to get you started.

D



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/too-many-small-files-and-task-tp20776p20783.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reading files recursively using spark

2014-12-19 Thread bethesda
On hdfs I created:

/one/one.txt  # contains text one
/one/two/two.txt  # contains text two

Then:  

val data = sc.textFile(/one/*)
data.collect

This returned:

Array(one, two)

So the above path designation appears to automatically recurse for you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reading-files-recursively-using-spark-tp20782p20784.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fetch Failure

2014-12-19 Thread bethesda
I have a job that runs fine on relatively small input datasets but then
reaches a threshold where I begin to consistently get Fetch failure for
the Failure Reason, late in the job, during a saveAsText() operation. 

The first error we are seeing on the Details for Stage page is
ExecutorLostFailure

My Shuffle Read is 3.3 GB and that's the only thing that seems high, we have
three servers and they are configured on this job for 5g memory, and the job
is running in spark-shell.  The first error in the shell is Lost executor 2
on (servername): remote Akka client disassociated.

We are still trying to understand how to best diagnose jobs using the web ui
so it's likely that there's some helpful info here that we just don't know
how to interpret -- is there any kind of troubleshooting guide beyond the
Spark Configuration page?  I don't know if I'm providing enough info here.

thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fetch-Failure-tp20787.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating a smaller, derivative RDD from an RDD

2014-12-18 Thread bethesda
We have a very large RDD and I need to create a new RDD whose values are
derived from each record of the original RDD, and we only retain the few new
records that meet a criteria.  I want to avoid creating a second large RDD
and then filtering it since I believe this could tax system resources
unnecessarily (tell me if that assumption is wrong.)

So for example, /and this is just an example/, say we have an RDD with 1 to
1,000,000 and we iterate through each value, and compute it's md5 hash, and
we only keep the results that start with 'A'.

What we've tried and seems to work but which seemed a bit ugly, and perhaps
not efficient, was the following in pseudocode. * Is this the best way to do
this?*

Thanks

bigRdd.flatMap( { i =
  val h = md5(i)
  if (h.substring(1,1) == 'A') {
Array(h)
  } else {
Array[String]()
  }
})



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-smaller-derivative-RDD-from-an-RDD-tp20769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why so many tasks?

2014-12-16 Thread bethesda
Our job is creating what appears to be an inordinate number of very small
tasks, which blow out our os inode and file limits.  Rather than continually
upping those limits, we are seeking to understand whether our real problem
is that too many tasks are running, perhaps because we are mis-configured or
we are coding incorrectly.

Rather than posting our actual code I have re-created the essence of the
matter in the shell with a directory of files simulating the data we deal
with.  We have three servers, each with 8G RAM.

Given 1,000 files, each containing a string of 100 characters, in the
myfiles directory:

val data = sc.textFile(/user/foo/myfiles/*)

val c = data.count

The count operation produces 1,000 tasks.  Is this normal?

val cart = data.cartesian(data)
cart.count

The cartesian operation produces 1M tasks.  I understand that the cartesian
product of 1,000 items against itself is 1M, however, it seems the overhead
of all this task creation and file I/O of all these tiny files outweighs the
gains of distributed computing.  What am I missing here?

Below is the truncated output of the count operation, if this helps indicate
a configuration problem.

Thank you.

scala data.count
14/12/16 07:40:46 INFO FileInputFormat: Total input paths to process : 1000
14/12/16 07:40:47 INFO SparkContext: Starting job: count at console:15
14/12/16 07:40:47 INFO DAGScheduler: Got job 0 (count at console:15) with
1000 output partitions (allowLocal=false)
14/12/16 07:40:47 INFO DAGScheduler: Final stage: Stage 0(count at
console:15)
14/12/16 07:40:47 INFO DAGScheduler: Parents of final stage: List()
14/12/16 07:40:47 INFO DAGScheduler: Missing parents: List()
14/12/16 07:40:47 INFO DAGScheduler: Submitting Stage 0
(/user/ds/randomfiles/* MappedRDD[3] at textFile at console:12), which has
no missing parents
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(2400) called with
curMem=507154, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2 stored as values in
memory (estimated size 2.3 KB, free 264.7 MB)
14/12/16 07:40:47 INFO MemoryStore: ensureFreeSpace(1813) called with
curMem=509554, maxMem=278019440
14/12/16 07:40:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes
in memory (estimated size 1813.0 B, free 264.7 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:44041 (size: 1813.0 B, free: 265.1 MB)
14/12/16 07:40:47 INFO BlockManagerMaster: Updated info of block
broadcast_2_piece0
14/12/16 07:40:47 INFO DAGScheduler: Submitting 1000 missing tasks from
Stage 0 (/user/ds/randomfiles/* MappedRDD[3] at textFile at console:12)
14/12/16 07:40:47 INFO TaskSchedulerImpl: Adding task set 0.0 with 1000
tasks
14/12/16 07:40:47 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID
0, dev-myserver-2.abc.cloud, NODE_LOCAL, 1202 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID
1, dev-myserver-3.abc.cloud, NODE_LOCAL, 1201 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID
2, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID
3, dev-myserver-2.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID
4, dev-myserver-3.abc.cloud, NODE_LOCAL, 1204 bytes)
14/12/16 07:40:47 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID
5, dev-myserver-1.abc.cloud, NODE_LOCAL, 1203 bytes)
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-3.abc.cloud/10.40.13.192:36133]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-2.abc.cloud/10.40.13.195:35716]
14/12/16 07:40:47 INFO ConnectionManager: Accepted connection from
[dev-myserver-1.abc.cloud/10.40.13.194:33728]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-1.abc.cloud/10.40.13.194:49458]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-3.abc.cloud/10.40.13.192:58579]
14/12/16 07:40:47 INFO SendingConnection: Initiating connection to
[dev-myserver-2.abc.cloud/10.40.13.195:52502]
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-3.abc.cloud/10.40.13.192:58579], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-1.abc.cloud/10.40.13.194:49458], 1 messages pending
14/12/16 07:40:47 INFO SendingConnection: Connected to
[dev-myserver-2.abc.cloud/10.40.13.195:52502], 1 messages pending
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-1.abc.cloud:49458 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-3.abc.cloud:58579 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory
on dev-myserver-2.abc.cloud:52502 (size: 1813.0 B, free: 1060.0 MB)
14/12/16 07:40:48 INFO 

Re: Why so many tasks?

2014-12-16 Thread bethesda
Thank you!  I had known about the small-files problem in HDFS but didn't
realize that it affected sc.textFile().



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-so-many-tasks-tp20712p20717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Appending an incrental value to each RDD record

2014-12-16 Thread bethesda
I think this is sort of a newbie question, but I've checked the api closely
and don't see an obvious answer:

Given an RDD, how would I create a new RDD of Tuples where the first Tuple
value is an incremented Int e.g. 1,2,3 ... and the second value of the Tuple
is the original RDD record?  I'm trying to simply assign a unique ID to each
record in my RDD.  (I want to stay in RDD land, and not convert to a List
and back to RDD, since that seems unnecessary and probably bad form.)

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Appending an incrental value to each RDD record

2014-12-16 Thread bethesda
Thanks! zipWithIndex() works well.  I had overlooked it because the name
'zip' is rather odd



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Appending-an-incrental-value-to-each-RDD-record-tp20718p20722.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best practice for multi-user web controller in front of Spark

2014-11-11 Thread bethesda
We are relatively new to spark and so far have been manually submitting
single jobs at a time for ML training, during our development process, using
spark-submit.  Each job accepts a small user-submitted data set and compares
it to every data set in our hdfs corpus, which only changes incrementally on
a daily basis.  (that detail is relevant to question 3 below)

Now we are ready to start building out the front-end, which will allow a
team of data scientists to submit their problems to the system via a web
front-end (web tier will be java).  Users could of course be submitting jobs
more or less simultaneously.  We want to make sure we understand how to best
structure this.  

Questions:  

1 - Does a new SparkContext get created in the web tier for each new request
for processing?  

2 - If so, how much time should we expect it to take for setting up the
context?  Our goal is to return a response to the users in under 10 seconds,
but if it takes many seconds to create a new context or otherwise set up the
job, then we need to adjust our expectations for what is possible.  From
using spark-shell one might conclude that it might take more than 10 seconds
to create a context, however it's not clear how much of that is
context-creation vs other things.

3 - (This last question perhaps deserves a post in and of itself:) if every
job is always comparing some little data structure to the same HDFS corpus
of data, what is the best pattern to use to cache the RDD's from HDFS so
they don't have to always be re-constituted from disk?  I.e. how can RDD's
be shared from the context of one job to the context of subsequent jobs? 
Or does something like memcache have to be used?

Thanks!
David



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-multi-user-web-controller-in-front-of-Spark-tp18581.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org