Re: OOM writing out sorted RDD

2014-08-09 Thread Bharath Ravi Kumar
Update: as expected, switching to kryo merely delays the inevitable. Does
anyone have experience controlling memory consumption while processing
(e.g. writing out) imbalanced partitions?
On 09-Aug-2014 10:41 am, "Bharath Ravi Kumar"  wrote:

> Our prototype application reads a 20GB dataset from HDFS (nearly 180
> partitions), groups it by key, sorts by rank and write out to HDFS in that
> order. The job runs against two nodes (16G, 24 cores per node available to
> the job). I noticed that the execution plan results in two sortByKey
> stages, followed by groupByKey and a saveAsTextFile. The shuffle write for
> groupByKey is ~20G in size. During the saveAsTextFile stage, however, after
> writing out 50% of the partitions, the memory on one of the executors
> shoots up to 16G and the executor spends all its time in GC's. Eventually,
> the logs show an OOM [1] included at the end of the mail followed by
> another TID loss to "FileSystem closed" errors indicated in stacktrace [2].
> I noticed that the partitions may be skewed as a result of the sort, with
> one or two paritions having upto 10% of all rows. I also noticed that the
> data written out until the 50% stage (when memory shoots up) had a large
> number of empty part- files followed by a few files of 200M in size. It
> could hence be that the attempt to write out one large partition is causing
> the OOM. The tuning documentation states that a larger level of parallelism
> might help mitigate the problem, but setting default parallelism to 2x the
> number of cores didn't help either. While I could attempt to partition
> manually (choosing custom ranges for a range partitioner), it appears that
> limiting read sizes (from the earlier shuffle) during the write to HDFS
> should help successfully write out even overloaded partitions as well. Are
> there parameters that might help achieve that?
> (On a related note, any idea if using Kryo serialization would help in
> this case?)
>
> Thanks,
> Bharath
>
> [1]
> 14/08/09 04:59:26 WARN TaskSetManager: Lost TID 711 (task 33.0:137)
> 14/08/09 04:59:26 WARN TaskSetManager: Loss was due to
> java.lang.OutOfMemoryError
> java.lang.OutOfMemoryError: Java heap space
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1989)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> at
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> at
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at
> org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1031)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at org.apache.spark.InterruptibleIterator.to
> (InterruptibleIterator.scala:28)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at
> org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at
> org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
> at
> org.apache.spark.rdd.OrderedRDDFunctions$$anonf

Spark SQL JSON dataset query nested datastructures

2014-08-09 Thread Sathish Kumaran Vairavelu
I have a simple JSON dataset as below. How do I query all parts.lock for
the id=1.

JSON: { "id": 1, "name": "A green door", "price": 12.50, "tags": ["home",
"green"], "parts" : [ { "lock" : "One lock", "key" : "single key" }, {
"lock" : "2 lock", "key" : "2 key" } ] }

Query: select id,name,price,parts.lockfrom product where id=1

The point is if I use parts[0].lock it will return 1 row as below:

{u'price': 12.5, u'id': 1, u'.lock': {u'lock': u'One lock', u'key':
u'single key'}, u'name': u'A green door'}

But I want to return all the locks the in the parts structure. It will
return multiple rows but thats the one I am looking for. This kind of a
relational join which I want to accomplish.

Please help me with this


Overriding dstream window definition

2014-08-09 Thread Ruchir Jha
Hi

I intend on using the same  Spark Streaming program for both real time and
batch processing of my time stamped data. However with batch processing all
window based operations would be meaningless because (I assume) the window
is defined by the arrival times of data and it is not possible to define
it using the actual timestamp on the data Itself (which is a field on
my java object that represents a datapoint in my time series )

This seems like one of those things that other people may have run into and
I am hoping you experts could help me with this or clarify my concept if my
understanding ie incorrect

Ruchir


Re: No space left on device

2014-08-09 Thread Jim Donahue
Root partitions on AWS instances tend to be small (for example, an m1.large 
instance has 2 420 GB drives, but only a 10 GB root partition).  Matei's 
probably right on about this - just need to be careful where things like the 
logs get stored.

From: Matei Zaharia mailto:matei.zaha...@gmail.com>>
Date: Saturday, August 9, 2014 at 1:48 PM
To: "u...@spark.incubator.apache.org" 
mailto:u...@spark.incubator.apache.org>>, 
kmatzen mailto:kmat...@gmail.com>>
Subject: Re: No space left on device

Your map-only job should not be shuffling, but if you want to see what's 
running, look at the web UI at http://:4040. In fact the job should not 
even write stuff to disk except inasmuch as the Hadoop S3 library might build 
up blocks locally before sending them on.

My guess is that it's not /mnt or /mnt2 that get filled, but the root volume, 
/, either with logs or with temp files created by the Hadoop S3 library. You 
can check this by running "df" while the job is executing. (Tools like Ganglia 
can probably also log this.) If it is the logs, you can symlink the spark/logs 
directory to someplace on /mnt instead. If it's /tmp, you can set 
java.io.tmpdir to another directory in Spark's JVM options.

Matei


On August 8, 2014 at 11:02:48 PM, kmatzen 
(kmat...@gmail.com) wrote:

I need some configuration / debugging recommendations to work around "no
space left on device". I am completely new to Spark, but I have some
experience with Hadoop.

I have a task where I read images stored in sequence files from s3://,
process them with a map in scala, and write the result back to s3://. I
have about 15 r3.8xlarge instances allocated with the included ec2 script.
The input data is about 1.5 TB and I expect the output to be similarly
sized. 15 r3.8xlarge instances give me about 3 TB of RAM and 9 TB of
storage, so hopefully more than enough for this task.

What happens is that it takes about an hour to read in the input from S3.
Once that is complete, then it begins to process the images and several
succeed. However, quickly, the job fails with "no space left on device".
By time I can ssh into one of the machines that reported the error, temp
files have already been cleaned up. I don't see any more detailed messages
in the slave logs. I have not yet changed the logging configuration from
the default.

The S3 input and output are cached in /mnt/ephemeral-hdfs/s3 and
/mnt2/ephemeral-hdfs/s3 (I see mostly input files at the time of failure,
but maybe 1 output file per slave). Shuffle files are generated in
/mnt/spark/ and /mnt2/spark/ (they were cleaned up
once the job failed and I don't remember the directory that I saw while it
was still running). I checked the disk utilization for a few slaves while
running the pipeline and they were pretty far away from being full. But the
failure probably came from a slave that was overloaded from a shard
imbalance (but why would that happen on read -> map -> write?).

What other things might I need to configure to prevent this error? What
logging options do people recommend? Is there an easy way to diagnose spark
failures from the web interface like with Hadoop?

I need to do some more testing to make sure I'm not emitting a giant image
for a malformed input image, but I figured I'd post this question early in
case anyone had any recommendations.

BTW, why does a map-only job need to shuffle? I was expecting it to
pipeline the transfer in from S3 operation, the actual computation
operation, and the transfer back out to S3 operation rather than doing
everything serially with a giant disk footprint. Actually, I was thinking
it would fuse all three operations into a single stage. Is that not what
Spark does?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-space-left-on-device-tp11829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



feature space search

2014-08-09 Thread filipus
i am wondering if i can use spark in order to search for interesting
featrures/attributes for modelling. In fact I just come from some
introductional sites about vowpal wabbit. i some how like the idea of out of
the core modelling.

well, i have transactional data where customers purchased products with
unique article numbers and a huge table of customers treatments like
coupons, prospects and so on. each articles has some properties or attribute
wich is written in another table.

in fact i would like to join the tables on the fly as in input for a mashine
learning platform like spark or vowpal wabbit just to get a ranking of good
attribte for modelling.

does somebody knows how to do it?

all the best :-)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/feature-space-search-tp11838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No space left on device

2014-08-09 Thread Matei Zaharia
Your map-only job should not be shuffling, but if you want to see what's 
running, look at the web UI at http://:4040. In fact the job should not 
even write stuff to disk except inasmuch as the Hadoop S3 library might build 
up blocks locally before sending them on.

My guess is that it's not /mnt or /mnt2 that get filled, but the root volume, 
/, either with logs or with temp files created by the Hadoop S3 library. You 
can check this by running "df" while the job is executing. (Tools like Ganglia 
can probably also log this.) If it is the logs, you can symlink the spark/logs 
directory to someplace on /mnt instead. If it's /tmp, you can set 
java.io.tmpdir to another directory in Spark's JVM options.

Matei

On August 8, 2014 at 11:02:48 PM, kmatzen (kmat...@gmail.com) wrote:

I need some configuration / debugging recommendations to work around "no 
space left on device". I am completely new to Spark, but I have some 
experience with Hadoop. 

I have a task where I read images stored in sequence files from s3://, 
process them with a map in scala, and write the result back to s3://. I 
have about 15 r3.8xlarge instances allocated with the included ec2 script. 
The input data is about 1.5 TB and I expect the output to be similarly 
sized. 15 r3.8xlarge instances give me about 3 TB of RAM and 9 TB of 
storage, so hopefully more than enough for this task. 

What happens is that it takes about an hour to read in the input from S3. 
Once that is complete, then it begins to process the images and several 
succeed. However, quickly, the job fails with "no space left on device". 
By time I can ssh into one of the machines that reported the error, temp 
files have already been cleaned up. I don't see any more detailed messages 
in the slave logs. I have not yet changed the logging configuration from 
the default. 

The S3 input and output are cached in /mnt/ephemeral-hdfs/s3 and 
/mnt2/ephemeral-hdfs/s3 (I see mostly input files at the time of failure, 
but maybe 1 output file per slave). Shuffle files are generated in 
/mnt/spark/ and /mnt2/spark/ (they were cleaned up 
once the job failed and I don't remember the directory that I saw while it 
was still running). I checked the disk utilization for a few slaves while 
running the pipeline and they were pretty far away from being full. But the 
failure probably came from a slave that was overloaded from a shard 
imbalance (but why would that happen on read -> map -> write?). 

What other things might I need to configure to prevent this error? What 
logging options do people recommend? Is there an easy way to diagnose spark 
failures from the web interface like with Hadoop? 

I need to do some more testing to make sure I'm not emitting a giant image 
for a malformed input image, but I figured I'd post this question early in 
case anyone had any recommendations. 

BTW, why does a map-only job need to shuffle? I was expecting it to 
pipeline the transfer in from S3 operation, the actual computation 
operation, and the transfer back out to S3 operation rather than doing 
everything serially with a giant disk footprint. Actually, I was thinking 
it would fuse all three operations into a single stage. Is that not what 
Spark does? 





-- 
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-space-left-on-device-tp11829.html
 
Sent from the Apache Spark User List mailing list archive at Nabble.com. 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



How to read zip files from HDFS into spark-shell using scala

2014-08-09 Thread Alton Alexander
I've tried uploading a zip file that contains a csv to hdfs and then
read it into spark using spark-shell and the first line is all messed
up. However when i upload a gzip to hdfs and then read it into spark
it does just fine. See output below:

Is there a way to read a zip file as is from hdfs in spark?


scala> val data =
sc.textFile("hdfs://alexander1:9000/user/root/daily_42602_2014.csv.zip").cache
data: org.apache.spark.rdd.RDD[String] = MappedRDD[7] at textFile at
:12

scala> data.first
res6: String = PK� ?E)�??���??? ?daily_42602_2014.csvUT
??�a�S�a�Sux ???��???��[� Ǖ� ���H�6ۻ�,�?~w�~?IH�.?�?
���V��J�?�t?Hg�}��
��̕1"3R*��d]DR�?��p��1��_��o�}���_�_?~�{�_y��_�ݯ�_�?�o��}yx���?���'���?��?
�_��}�)�}��??�}(|�<���D��?�/û������7��m����~=�s��Y/�
w �z?�
scala> val data =
sc.textFile("hdfs://alexander1:9000/user/root/daily_42602_2014.csv.gz").cache
data: org.apache.spark.rdd.RDD[String] = MappedRDD[9] at textFile at
:12

scala> data.first
res7: String = "State Code","County Code","Site Num","Parameter
Code","POC","Latitude","Longitude","Datum","Parameter Name","Sample
Duration","Pollutant Standard","Date Local","Units of Measure","Event
Type","Observation Count","Observation Percent","Arithmetic Mean","1st
Max Value","1st Max Hour","AQI","Method Name","Local Site
Name","Address","State Name","County Name","City Name","CBSA
Name","Date of Last Change"

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Kevin James Matzen
I have a related question.  With Hadoop, I would do the same thing for
non-serializable objects and setup().  I also had a use case where it
was so expensive to initialize the non-serializable object that I
would make it a static member of the mapper, turn on JVM reuse across
tasks, and then prevent the reinitialization for every task on the
same node.  Is that easy to do with Spark?  Assuming Spark reuses the
JVM across tasks by default, then taking raofengyun's factory method
and it return a singleton should work, right?  Does Spark reuse JVMs
across tasks?

On Sat, Aug 9, 2014 at 7:48 AM, Fengyun RAO  wrote:
> Although nobody answers the Two questions, in my practice, it seems both are
> yes.
>
>
> 2014-08-04 19:50 GMT+08:00 Fengyun RAO :
>>
>> object LogParserWrapper {
>> private val logParser = {
>> val settings = new ...
>> val builders = new 
>> new LogParser(builders, settings)
>> }
>> def getParser = logParser
>> }
>>
>> object MySparkJob {
>>def main(args: Array[String]) {
>> val sc = new SparkContext()
>> val lines = sc.textFile(arg(0))
>>
>> val parsed = lines.map(line =>
>> LogParserWrapper.getParser.parse(line))
>> ...
>> }
>>
>> Q1: Is this the right way to share LogParser instance among all tasks on
>> the same worker, if LogParser is not serializable?
>>
>> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
>> ConcurrentHashMap where all tasks on the same worker try to get() and put()
>> items?
>>
>>
>> 2014-08-04 19:29 GMT+08:00 Sean Owen :
>>
>>> The issue is that it's not clear what "parser" is. It's not shown in
>>> your code. The snippet you show does not appear to contain a parser
>>> object.
>>>
>>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO 
>>> wrote:
>>> > Thanks, Sean!
>>> >
>>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>>> > Using
>>> > a Single Thread? says " parser instance is now a singleton created in
>>> > the
>>> > scope of our driver program" which I thought was in the scope of
>>> > executor.
>>> > Am I wrong, or why?
>>> >
>>> > I didn't want the equivalent of "setup()" method, since I want to share
>>> > the
>>> > "parser" among tasks in the same worker node. It takes tens of seconds
>>> > to
>>> > initialize a "parser". What's more, I want to know if the "parser"
>>> > could
>>> > have a field such as ConcurrentHashMap which all tasks in the node may
>>> > get()
>>> > of put() items.
>>> >
>>> >
>>> >
>>> >
>>> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
>>> >
>>> >> The parser does not need to be serializable. In the line:
>>> >>
>>> >> lines.map(line => JSONParser.parse(line))
>>> >>
>>> >> ... the parser is called but there is no parser object that with state
>>> >> that can be serialized. Are you sure it does not work?
>>> >>
>>> >> The error message alluded to originally refers to an object not shown
>>> >> in the code, so I'm not 100% sure this was the original issue.
>>> >>
>>> >> If you want, the equivalent of "setup()" is really "writing some code
>>> >> at the start of a call to mapPartitions()"
>>> >>
>>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
>>> >> wrote:
>>> >> > Thanks, Ron.
>>> >> >
>>> >> > The problem is that the "parser" is written in another package which
>>> >> > is
>>> >> > not
>>> >> > serializable.
>>> >> >
>>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>>> >> >
>>> >> > Now in spark, I want to create it for each worker, and share it
>>> >> > among
>>> >> > all
>>> >> > the tasks on the same work node.
>>> >> >
>>> >> > I know different workers run on different machine, but it doesn't
>>> >> > have
>>> >> > to
>>> >> > communicate between workers.
>>> >
>>> >
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: KMeans Input Format

2014-08-09 Thread AlexanderRiggers
Thank you for your help. After restructuring my code to Seans input, it
worked without changing Spark context.  I now took the same file format just
a bigger file(2.7GB) from s3 to my cluster with 4 c3.xlarge instances and
Spark 1.0.2. Unluckly my task freezes again after a short time. I tried it
with cached and uncached RDDs. Are there some configurations to be made for
such big files and MLlib?

scala> import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.clustering.KMeans

scala> import org.apache.spark.mllib.clustering.KMeansModel
import org.apache.spark.mllib.clustering.KMeansModel

scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors

scala> 

scala> // Load and parse the data

scala> val data = sc.textFile("s3n://ampcamp-arigge/large_file.new.txt")
14/08/09 14:58:31 INFO storage.MemoryStore: ensureFreeSpace(35666) called
with curMem=0, maxMem=309225062
14/08/09 14:58:31 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 34.8 KB, free 294.9 MB)
data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at
:15

scala> val parsedData = data.map(s => Vectors.dense(s.split('
').map(_.toDouble)))
parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] =
MappedRDD[2] at map at :17

scala> val train = parsedData.repartition(20).cache() 
train: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] =
MappedRDD[6] at repartition at :19

scala> 

scala> // Set model and run it

scala> val model = new KMeans().
 | setInitializationMode("k-means||").
 | setK(2).setMaxIterations(2).
 | setEpsilon(1e-4).
 | setRuns(1).
 | run(parsedData)
14/08/09 14:58:33 WARN snappy.LoadSnappy: Snappy native library is available
14/08/09 14:58:33 INFO util.NativeCodeLoader: Loaded the native-hadoop
library
14/08/09 14:58:33 INFO snappy.LoadSnappy: Snappy native library loaded
14/08/09 14:58:34 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/08/09 14:58:34 INFO spark.SparkContext: Starting job: takeSample at
KMeans.scala:260
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Got job 0 (takeSample at
KMeans.scala:260) with 2 output partitions (allowLocal=false)
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Final stage: Stage
0(takeSample at KMeans.scala:260)
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Missing parents: List()
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[10] at map at KMeans.scala:123), which has no missing parents
14/08/09 14:58:34 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (MappedRDD[10] at map at KMeans.scala:123)
14/08/09 14:58:34 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
2 tasks
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
0 on executor 5: ip-172-31-16-25.ec2.internal (PROCESS_LOCAL)
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
2215 bytes in 2 ms
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
1 on executor 4: ip-172-31-16-24.ec2.internal (PROCESS_LOCAL)
14/08/09 14:58:34 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
2215 bytes in 1 ms





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-Input-Format-tp11654p11834.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to share a NonSerializable variable among tasks in the same worker node?

2014-08-09 Thread Fengyun RAO
Although nobody answers the Two questions, in my practice, it seems both
are yes.


2014-08-04 19:50 GMT+08:00 Fengyun RAO :

> object LogParserWrapper {
> private val logParser = {
> val settings = new ...
> val builders = new 
> new LogParser(builders, settings)
> }
> def getParser = logParser
> }
>
> object MySparkJob {
>def main(args: Array[String]) {
> val sc = new SparkContext()
> val lines = sc.textFile(arg(0))
>
> val parsed = lines.map(line =>
> LogParserWrapper.getParser.parse(line))
> ...
> }
>
> Q1: Is this the right way to share LogParser instance among all tasks on
> the same worker, if LogParser is not serializable?
>
> Q2: LogParser is read-only, but can LogParser hold a cache field such as a
> ConcurrentHashMap where all tasks on the same worker try to get() and put()
> items?
>
>
> 2014-08-04 19:29 GMT+08:00 Sean Owen :
>
> The issue is that it's not clear what "parser" is. It's not shown in
>> your code. The snippet you show does not appear to contain a parser
>> object.
>>
>> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO 
>> wrote:
>> > Thanks, Sean!
>> >
>> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
>> Using
>> > a Single Thread? says " parser instance is now a singleton created in
>> the
>> > scope of our driver program" which I thought was in the scope of
>> executor.
>> > Am I wrong, or why?
>> >
>> > I didn't want the equivalent of "setup()" method, since I want to share
>> the
>> > "parser" among tasks in the same worker node. It takes tens of seconds
>> to
>> > initialize a "parser". What's more, I want to know if the "parser" could
>> > have a field such as ConcurrentHashMap which all tasks in the node may
>> get()
>> > of put() items.
>> >
>> >
>> >
>> >
>> > 2014-08-04 16:35 GMT+08:00 Sean Owen :
>> >
>> >> The parser does not need to be serializable. In the line:
>> >>
>> >> lines.map(line => JSONParser.parse(line))
>> >>
>> >> ... the parser is called but there is no parser object that with state
>> >> that can be serialized. Are you sure it does not work?
>> >>
>> >> The error message alluded to originally refers to an object not shown
>> >> in the code, so I'm not 100% sure this was the original issue.
>> >>
>> >> If you want, the equivalent of "setup()" is really "writing some code
>> >> at the start of a call to mapPartitions()"
>> >>
>> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO 
>> wrote:
>> >> > Thanks, Ron.
>> >> >
>> >> > The problem is that the "parser" is written in another package which
>> is
>> >> > not
>> >> > serializable.
>> >> >
>> >> > In mapreduce, I could create the "parser" in the map setup() method.
>> >> >
>> >> > Now in spark, I want to create it for each worker, and share it among
>> >> > all
>> >> > the tasks on the same work node.
>> >> >
>> >> > I know different workers run on different machine, but it doesn't
>> have
>> >> > to
>> >> > communicate between workers.
>> >
>> >
>>
>
>


set SPARK_LOCAL_DIRS issue

2014-08-09 Thread Baoqiang Cao
Hi

I’m trying to using a specific dir for spark working directory since I have 
limited space at /tmp. I tried: 
1)
export SPARK_LOCAL_DIRS=“/mnt/data/tmp”
or 2)
SPARK_LOCAL_DIRS=“/mnt/data/tmp” in spark-env.sh

But neither worked, since the output of spark still saying 

ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes 
to file /tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1
java.io.FileNotFoundException: 
/tmp/spark-local-20140809134509-0502/34/shuffle_0_436_1 (No space left on 
device)

anybody help with correctly setting up the “tmp” directory?

Best,
Baoqiang Cao
Blog: http://baoqiang.org
Email: bqcaom...@gmail.com